pulsar.schema.schema

  1#
  2# Licensed to the Apache Software Foundation (ASF) under one
  3# or more contributor license agreements.  See the NOTICE file
  4# distributed with this work for additional information
  5# regarding copyright ownership.  The ASF licenses this file
  6# to you under the Apache License, Version 2.0 (the
  7# "License"); you may not use this file except in compliance
  8# with the License.  You may obtain a copy of the License at
  9#
 10#   http://www.apache.org/licenses/LICENSE-2.0
 11#
 12# Unless required by applicable law or agreed to in writing,
 13# software distributed under the License is distributed on an
 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 15# KIND, either express or implied.  See the License for the
 16# specific language governing permissions and limitations
 17# under the License.
 18#
 19
 20
 21from abc import abstractmethod
 22import json
 23import _pulsar
 24import enum
 25
 26
 27class Schema(object):
 28    def __init__(self, record_cls, schema_type, schema_definition, schema_name):
 29        self._record_cls = record_cls
 30        self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name,
 31                                               json.dumps(schema_definition, indent=True))
 32
 33    @abstractmethod
 34    def encode(self, obj):
 35        pass
 36
 37    @abstractmethod
 38    def decode(self, data):
 39        pass
 40
 41    def schema_info(self):
 42        return self._schema_info
 43
 44    def _validate_object_type(self, obj):
 45        if not isinstance(obj, self._record_cls):
 46            raise TypeError('Invalid record obj of type ' + str(type(obj))
 47                            + ' - expected type is ' + str(self._record_cls))
 48
 49
 50class BytesSchema(Schema):
 51    def __init__(self):
 52        super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES')
 53
 54    def encode(self, data):
 55        self._validate_object_type(data)
 56        return data
 57
 58    def decode(self, data):
 59        return data
 60
 61    def __str__(self):
 62        return 'BytesSchema'
 63
 64
 65class StringSchema(Schema):
 66    def __init__(self):
 67        super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING')
 68
 69    def encode(self, obj):
 70        self._validate_object_type(obj)
 71        return obj.encode('utf-8')
 72
 73    def decode(self, data):
 74        return data.decode('utf-8')
 75
 76    def __str__(self):
 77        return 'StringSchema'
 78
 79
 80
 81class JsonSchema(Schema):
 82
 83    def __init__(self, record_cls):
 84        super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON,
 85                                         record_cls.schema(), 'JSON')
 86
 87    def _get_serialized_value(self, o):
 88        if isinstance(o, enum.Enum):
 89            return o.value
 90        else:
 91            return o.__dict__
 92
 93    def encode(self, obj):
 94        self._validate_object_type(obj)
 95        del obj.__dict__['_default']
 96        del obj.__dict__['_required']
 97        del obj.__dict__['_required_default']
 98
 99        return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8')
100
101    def decode(self, data):
102        return self._record_cls(**json.loads(data))
class Schema:
28class Schema(object):
29    def __init__(self, record_cls, schema_type, schema_definition, schema_name):
30        self._record_cls = record_cls
31        self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name,
32                                               json.dumps(schema_definition, indent=True))
33
34    @abstractmethod
35    def encode(self, obj):
36        pass
37
38    @abstractmethod
39    def decode(self, data):
40        pass
41
42    def schema_info(self):
43        return self._schema_info
44
45    def _validate_object_type(self, obj):
46        if not isinstance(obj, self._record_cls):
47            raise TypeError('Invalid record obj of type ' + str(type(obj))
48                            + ' - expected type is ' + str(self._record_cls))
Schema(record_cls, schema_type, schema_definition, schema_name)
29    def __init__(self, record_cls, schema_type, schema_definition, schema_name):
30        self._record_cls = record_cls
31        self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name,
32                                               json.dumps(schema_definition, indent=True))
@abstractmethod
def encode(self, obj):
34    @abstractmethod
35    def encode(self, obj):
36        pass
@abstractmethod
def decode(self, data):
38    @abstractmethod
39    def decode(self, data):
40        pass
def schema_info(self):
42    def schema_info(self):
43        return self._schema_info
class BytesSchema(Schema):
51class BytesSchema(Schema):
52    def __init__(self):
53        super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES')
54
55    def encode(self, data):
56        self._validate_object_type(data)
57        return data
58
59    def decode(self, data):
60        return data
61
62    def __str__(self):
63        return 'BytesSchema'
BytesSchema()
52    def __init__(self):
53        super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES')
def encode(self, data):
55    def encode(self, data):
56        self._validate_object_type(data)
57        return data
def decode(self, data):
59    def decode(self, data):
60        return data
Inherited Members
Schema
schema_info
class StringSchema(Schema):
66class StringSchema(Schema):
67    def __init__(self):
68        super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING')
69
70    def encode(self, obj):
71        self._validate_object_type(obj)
72        return obj.encode('utf-8')
73
74    def decode(self, data):
75        return data.decode('utf-8')
76
77    def __str__(self):
78        return 'StringSchema'
StringSchema()
67    def __init__(self):
68        super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING')
def encode(self, obj):
70    def encode(self, obj):
71        self._validate_object_type(obj)
72        return obj.encode('utf-8')
def decode(self, data):
74    def decode(self, data):
75        return data.decode('utf-8')
Inherited Members
Schema
schema_info
class JsonSchema(Schema):
 82class JsonSchema(Schema):
 83
 84    def __init__(self, record_cls):
 85        super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON,
 86                                         record_cls.schema(), 'JSON')
 87
 88    def _get_serialized_value(self, o):
 89        if isinstance(o, enum.Enum):
 90            return o.value
 91        else:
 92            return o.__dict__
 93
 94    def encode(self, obj):
 95        self._validate_object_type(obj)
 96        del obj.__dict__['_default']
 97        del obj.__dict__['_required']
 98        del obj.__dict__['_required_default']
 99
100        return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8')
101
102    def decode(self, data):
103        return self._record_cls(**json.loads(data))
JsonSchema(record_cls)
84    def __init__(self, record_cls):
85        super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON,
86                                         record_cls.schema(), 'JSON')
def encode(self, obj):
 94    def encode(self, obj):
 95        self._validate_object_type(obj)
 96        del obj.__dict__['_default']
 97        del obj.__dict__['_required']
 98        del obj.__dict__['_required_default']
 99
100        return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8')
def decode(self, data):
102    def decode(self, data):
103        return self._record_cls(**json.loads(data))
Inherited Members
Schema
schema_info