pulsar.schema.schema_avro

 1#
 2# Licensed to the Apache Software Foundation (ASF) under one
 3# or more contributor license agreements.  See the NOTICE file
 4# distributed with this work for additional information
 5# regarding copyright ownership.  The ASF licenses this file
 6# to you under the Apache License, Version 2.0 (the
 7# "License"); you may not use this file except in compliance
 8# with the License.  You may obtain a copy of the License at
 9#
10#   http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied.  See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20import _pulsar
21import io
22import enum
23
24from . import Record
25from .schema import Schema
26
27try:
28    import fastavro
29    HAS_AVRO = True
30except ImportError:
31    HAS_AVRO = False
32
33if HAS_AVRO:
34    class AvroSchema(Schema):
35        def __init__(self, record_cls, schema_definition=None):
36            if record_cls is None and schema_definition is None:
37                raise AssertionError("The param record_cls and schema_definition shouldn't be both None.")
38
39            if record_cls is not None:
40                self._schema = record_cls.schema()
41            else:
42                self._schema = schema_definition
43            super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
44
45        def _get_serialized_value(self, x):
46            if isinstance(x, enum.Enum):
47                return x.name
48            elif isinstance(x, Record):
49                return self.encode_dict(x.__dict__)
50            elif isinstance(x, list):
51                arr = []
52                for item in x:
53                    arr.append(self._get_serialized_value(item))
54                return arr
55            elif isinstance(x, dict):
56                return self.encode_dict(x)
57            else:
58                return x
59
60        def encode(self, obj):
61            buffer = io.BytesIO()
62            m = obj
63            if self._record_cls is not None:
64                self._validate_object_type(obj)
65                m = self.encode_dict(obj.__dict__)
66            elif not isinstance(obj, dict):
67                raise ValueError('If using the custom schema, the record data should be dict type.')
68
69            fastavro.schemaless_writer(buffer, self._schema, m)
70            return buffer.getvalue()
71
72        def encode_dict(self, d):
73            obj = {}
74            for k, v in d.items():
75                obj[k] = self._get_serialized_value(v)
76            return obj
77
78        def decode(self, data):
79            buffer = io.BytesIO(data)
80            d = fastavro.schemaless_reader(buffer, self._schema)
81            if self._record_cls is not None:
82                return self._record_cls(**d)
83            else:
84                return d
85
86else:
87    class AvroSchema(Schema):
88        def __init__(self, _record_cls, _schema_definition):
89            raise Exception("Avro library support was not found. Make sure to install Pulsar client " +
90                            "with Avro support: pip3 install 'pulsar-client[avro]'")
91
92        def encode(self, obj):
93            pass
94
95        def decode(self, data):
96            pass
class AvroSchema(pulsar.schema.schema.Schema):
35    class AvroSchema(Schema):
36        def __init__(self, record_cls, schema_definition=None):
37            if record_cls is None and schema_definition is None:
38                raise AssertionError("The param record_cls and schema_definition shouldn't be both None.")
39
40            if record_cls is not None:
41                self._schema = record_cls.schema()
42            else:
43                self._schema = schema_definition
44            super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
45
46        def _get_serialized_value(self, x):
47            if isinstance(x, enum.Enum):
48                return x.name
49            elif isinstance(x, Record):
50                return self.encode_dict(x.__dict__)
51            elif isinstance(x, list):
52                arr = []
53                for item in x:
54                    arr.append(self._get_serialized_value(item))
55                return arr
56            elif isinstance(x, dict):
57                return self.encode_dict(x)
58            else:
59                return x
60
61        def encode(self, obj):
62            buffer = io.BytesIO()
63            m = obj
64            if self._record_cls is not None:
65                self._validate_object_type(obj)
66                m = self.encode_dict(obj.__dict__)
67            elif not isinstance(obj, dict):
68                raise ValueError('If using the custom schema, the record data should be dict type.')
69
70            fastavro.schemaless_writer(buffer, self._schema, m)
71            return buffer.getvalue()
72
73        def encode_dict(self, d):
74            obj = {}
75            for k, v in d.items():
76                obj[k] = self._get_serialized_value(v)
77            return obj
78
79        def decode(self, data):
80            buffer = io.BytesIO(data)
81            d = fastavro.schemaless_reader(buffer, self._schema)
82            if self._record_cls is not None:
83                return self._record_cls(**d)
84            else:
85                return d
AvroSchema(record_cls, schema_definition=None)
36        def __init__(self, record_cls, schema_definition=None):
37            if record_cls is None and schema_definition is None:
38                raise AssertionError("The param record_cls and schema_definition shouldn't be both None.")
39
40            if record_cls is not None:
41                self._schema = record_cls.schema()
42            else:
43                self._schema = schema_definition
44            super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
def encode(self, obj):
61        def encode(self, obj):
62            buffer = io.BytesIO()
63            m = obj
64            if self._record_cls is not None:
65                self._validate_object_type(obj)
66                m = self.encode_dict(obj.__dict__)
67            elif not isinstance(obj, dict):
68                raise ValueError('If using the custom schema, the record data should be dict type.')
69
70            fastavro.schemaless_writer(buffer, self._schema, m)
71            return buffer.getvalue()
def encode_dict(self, d):
73        def encode_dict(self, d):
74            obj = {}
75            for k, v in d.items():
76                obj[k] = self._get_serialized_value(v)
77            return obj
def decode(self, data):
79        def decode(self, data):
80            buffer = io.BytesIO(data)
81            d = fastavro.schemaless_reader(buffer, self._schema)
82            if self._record_cls is not None:
83                return self._record_cls(**d)
84            else:
85                return d