pulsar.schema.schema

  1#
  2# Licensed to the Apache Software Foundation (ASF) under one
  3# or more contributor license agreements.  See the NOTICE file
  4# distributed with this work for additional information
  5# regarding copyright ownership.  The ASF licenses this file
  6# to you under the Apache License, Version 2.0 (the
  7# "License"); you may not use this file except in compliance
  8# with the License.  You may obtain a copy of the License at
  9#
 10#   http://www.apache.org/licenses/LICENSE-2.0
 11#
 12# Unless required by applicable law or agreed to in writing,
 13# software distributed under the License is distributed on an
 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 15# KIND, either express or implied.  See the License for the
 16# specific language governing permissions and limitations
 17# under the License.
 18#
 19
 20
 21from abc import abstractmethod
 22import json
 23import _pulsar
 24import enum
 25
 26
 27class Schema(object):
 28    def __init__(self, record_cls, schema_type, schema_definition, schema_name):
 29        self._record_cls = record_cls
 30        self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name,
 31                                               json.dumps(schema_definition, indent=True))
 32
 33    @abstractmethod
 34    def encode(self, obj):
 35        pass
 36
 37    @abstractmethod
 38    def decode(self, data):
 39        pass
 40
 41    def schema_info(self):
 42        return self._schema_info
 43
 44    def _validate_object_type(self, obj):
 45        if not isinstance(obj, self._record_cls):
 46            raise TypeError('Invalid record obj of type ' + str(type(obj))
 47                            + ' - expected type is ' + str(self._record_cls))
 48
 49
 50class BytesSchema(Schema):
 51    def __init__(self):
 52        super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES')
 53
 54    def encode(self, data):
 55        self._validate_object_type(data)
 56        return data
 57
 58    def decode(self, data):
 59        return data
 60
 61    def __str__(self):
 62        return 'BytesSchema'
 63
 64
 65class StringSchema(Schema):
 66    def __init__(self):
 67        super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING')
 68
 69    def encode(self, obj):
 70        self._validate_object_type(obj)
 71        return obj.encode('utf-8')
 72
 73    def decode(self, data):
 74        return data.decode('utf-8')
 75
 76    def __str__(self):
 77        return 'StringSchema'
 78
 79
 80def remove_reserved_key(data):
 81    if '_default' in data:
 82        del data['_default']
 83    if '_required' in data:
 84        del data['_required']
 85    if '_required_default' in data:
 86        del data['_required_default']
 87
 88
 89class JsonSchema(Schema):
 90
 91    def __init__(self, record_cls):
 92        super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON,
 93                                         record_cls.schema(), 'JSON')
 94
 95    def _get_serialized_value(self, o):
 96        if isinstance(o, enum.Enum):
 97            return o.value
 98        else:
 99            data = o.__dict__.copy()
100            remove_reserved_key(data)
101            return data
102
103    def encode(self, obj):
104        self._validate_object_type(obj)
105        # Copy the dict of the object as to not modify the provided object via the reference provided
106        data = obj.__dict__.copy()
107        remove_reserved_key(data)
108        return json.dumps(data, default=self._get_serialized_value, indent=True).encode('utf-8')
109
110    def decode(self, data):
111        return self._record_cls(**json.loads(data))
class Schema:
28class Schema(object):
29    def __init__(self, record_cls, schema_type, schema_definition, schema_name):
30        self._record_cls = record_cls
31        self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name,
32                                               json.dumps(schema_definition, indent=True))
33
34    @abstractmethod
35    def encode(self, obj):
36        pass
37
38    @abstractmethod
39    def decode(self, data):
40        pass
41
42    def schema_info(self):
43        return self._schema_info
44
45    def _validate_object_type(self, obj):
46        if not isinstance(obj, self._record_cls):
47            raise TypeError('Invalid record obj of type ' + str(type(obj))
48                            + ' - expected type is ' + str(self._record_cls))
Schema(record_cls, schema_type, schema_definition, schema_name)
29    def __init__(self, record_cls, schema_type, schema_definition, schema_name):
30        self._record_cls = record_cls
31        self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name,
32                                               json.dumps(schema_definition, indent=True))
@abstractmethod
def encode(self, obj):
34    @abstractmethod
35    def encode(self, obj):
36        pass
@abstractmethod
def decode(self, data):
38    @abstractmethod
39    def decode(self, data):
40        pass
def schema_info(self):
42    def schema_info(self):
43        return self._schema_info
class BytesSchema(Schema):
51class BytesSchema(Schema):
52    def __init__(self):
53        super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES')
54
55    def encode(self, data):
56        self._validate_object_type(data)
57        return data
58
59    def decode(self, data):
60        return data
61
62    def __str__(self):
63        return 'BytesSchema'
BytesSchema()
52    def __init__(self):
53        super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES')
def encode(self, data):
55    def encode(self, data):
56        self._validate_object_type(data)
57        return data
def decode(self, data):
59    def decode(self, data):
60        return data
Inherited Members
Schema
schema_info
class StringSchema(Schema):
66class StringSchema(Schema):
67    def __init__(self):
68        super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING')
69
70    def encode(self, obj):
71        self._validate_object_type(obj)
72        return obj.encode('utf-8')
73
74    def decode(self, data):
75        return data.decode('utf-8')
76
77    def __str__(self):
78        return 'StringSchema'
StringSchema()
67    def __init__(self):
68        super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING')
def encode(self, obj):
70    def encode(self, obj):
71        self._validate_object_type(obj)
72        return obj.encode('utf-8')
def decode(self, data):
74    def decode(self, data):
75        return data.decode('utf-8')
Inherited Members
Schema
schema_info
def remove_reserved_key(data):
81def remove_reserved_key(data):
82    if '_default' in data:
83        del data['_default']
84    if '_required' in data:
85        del data['_required']
86    if '_required_default' in data:
87        del data['_required_default']
class JsonSchema(Schema):
 90class JsonSchema(Schema):
 91
 92    def __init__(self, record_cls):
 93        super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON,
 94                                         record_cls.schema(), 'JSON')
 95
 96    def _get_serialized_value(self, o):
 97        if isinstance(o, enum.Enum):
 98            return o.value
 99        else:
100            data = o.__dict__.copy()
101            remove_reserved_key(data)
102            return data
103
104    def encode(self, obj):
105        self._validate_object_type(obj)
106        # Copy the dict of the object as to not modify the provided object via the reference provided
107        data = obj.__dict__.copy()
108        remove_reserved_key(data)
109        return json.dumps(data, default=self._get_serialized_value, indent=True).encode('utf-8')
110
111    def decode(self, data):
112        return self._record_cls(**json.loads(data))
JsonSchema(record_cls)
92    def __init__(self, record_cls):
93        super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON,
94                                         record_cls.schema(), 'JSON')
def encode(self, obj):
104    def encode(self, obj):
105        self._validate_object_type(obj)
106        # Copy the dict of the object as to not modify the provided object via the reference provided
107        data = obj.__dict__.copy()
108        remove_reserved_key(data)
109        return json.dumps(data, default=self._get_serialized_value, indent=True).encode('utf-8')
def decode(self, data):
111    def decode(self, data):
112        return self._record_cls(**json.loads(data))
Inherited Members
Schema
schema_info