pulsar.functions.serde
serde.py: SerDe defines the interface for serialization/deserialization.
Everytime a message is read from pulsar topic, the serde is invoked to
serialize the bytes into an object before invoking the process method.
Anytime a python object needs to be written back to pulsar, it is
serialized into bytes before writing.
1#!/usr/bin/env python 2# 3# Licensed to the Apache Software Foundation (ASF) under one 4# or more contributor license agreements. See the NOTICE file 5# distributed with this work for additional information 6# regarding copyright ownership. The ASF licenses this file 7# to you under the Apache License, Version 2.0 (the 8# "License"); you may not use this file except in compliance 9# with the License. You may obtain a copy of the License at 10# 11# http://www.apache.org/licenses/LICENSE-2.0 12# 13# Unless required by applicable law or agreed to in writing, 14# software distributed under the License is distributed on an 15# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16# KIND, either express or implied. See the License for the 17# specific language governing permissions and limitations 18# under the License. 19# 20 21# -*- encoding: utf-8 -*- 22 23# Licensed to the Apache Software Foundation (ASF) under one 24# or more contributor license agreements. See the NOTICE file 25# distributed with this work for additional information 26# regarding copyright ownership. The ASF licenses this file 27# to you under the Apache License, Version 2.0 (the 28# "License"); you may not use this file except in compliance 29# with the License. You may obtain a copy of the License at 30# 31# http://www.apache.org/licenses/LICENSE-2.0 32# 33# Unless required by applicable law or agreed to in writing, 34# software distributed under the License is distributed on an 35# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 36# KIND, either express or implied. See the License for the 37# specific language governing permissions and limitations 38# under the License. 39# 40"""serde.py: SerDe defines the interface for serialization/deserialization. 41# Everytime a message is read from pulsar topic, the serde is invoked to 42# serialize the bytes into an object before invoking the process method. 43# Anytime a python object needs to be written back to pulsar, it is 44# serialized into bytes before writing. 45""" 46from abc import abstractmethod 47 48import pickle 49 50class SerDe(object): 51 """Interface for Serialization/Deserialization""" 52 @abstractmethod 53 def serialize(self, input): 54 """Serialize input message into bytes""" 55 pass 56 57 @abstractmethod 58 def deserialize(self, input_bytes): 59 """Serialize input_bytes into an object""" 60 pass 61 62class PickleSerDe(SerDe): 63 """Pickle based serializer""" 64 def serialize(self, input): 65 return pickle.dumps(input) 66 67 def deserialize(self, input_bytes): 68 return pickle.loads(input_bytes) 69 70class IdentitySerDe(SerDe): 71 """Simple Serde that just conversion to string and back""" 72 def __init__(self): 73 self._types = [int, float, complex, str] 74 75 def serialize(self, input): 76 if type(input) in self._types: 77 return str(input).encode('utf-8') 78 if type(input) == bytes: 79 return input 80 raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input)) 81 82 def deserialize(self, input_bytes): 83 for typ in self._types: 84 try: 85 return typ(input_bytes.decode('utf-8')) 86 except: 87 pass 88 return input_bytes
class
SerDe:
51class SerDe(object): 52 """Interface for Serialization/Deserialization""" 53 @abstractmethod 54 def serialize(self, input): 55 """Serialize input message into bytes""" 56 pass 57 58 @abstractmethod 59 def deserialize(self, input_bytes): 60 """Serialize input_bytes into an object""" 61 pass
Interface for Serialization/Deserialization
63class PickleSerDe(SerDe): 64 """Pickle based serializer""" 65 def serialize(self, input): 66 return pickle.dumps(input) 67 68 def deserialize(self, input_bytes): 69 return pickle.loads(input_bytes)
Pickle based serializer
71class IdentitySerDe(SerDe): 72 """Simple Serde that just conversion to string and back""" 73 def __init__(self): 74 self._types = [int, float, complex, str] 75 76 def serialize(self, input): 77 if type(input) in self._types: 78 return str(input).encode('utf-8') 79 if type(input) == bytes: 80 return input 81 raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input)) 82 83 def deserialize(self, input_bytes): 84 for typ in self._types: 85 try: 86 return typ(input_bytes.decode('utf-8')) 87 except: 88 pass 89 return input_bytes
Simple Serde that just conversion to string and back