pulsar.functions.serde

serde.py: SerDe defines the interface for serialization/deserialization.

Everytime a message is read from pulsar topic, the serde is invoked to

serialize the bytes into an object before invoking the process method.

Anytime a python object needs to be written back to pulsar, it is

serialized into bytes before writing.

 1#!/usr/bin/env python
 2#
 3# Licensed to the Apache Software Foundation (ASF) under one
 4# or more contributor license agreements.  See the NOTICE file
 5# distributed with this work for additional information
 6# regarding copyright ownership.  The ASF licenses this file
 7# to you under the Apache License, Version 2.0 (the
 8# "License"); you may not use this file except in compliance
 9# with the License.  You may obtain a copy of the License at
10#
11#   http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing,
14# software distributed under the License is distributed on an
15# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16# KIND, either express or implied.  See the License for the
17# specific language governing permissions and limitations
18# under the License.
19#
20
21# -*- encoding: utf-8 -*-
22
23# Licensed to the Apache Software Foundation (ASF) under one
24# or more contributor license agreements.  See the NOTICE file
25# distributed with this work for additional information
26# regarding copyright ownership.  The ASF licenses this file
27# to you under the Apache License, Version 2.0 (the
28# "License"); you may not use this file except in compliance
29# with the License.  You may obtain a copy of the License at
30#
31#   http://www.apache.org/licenses/LICENSE-2.0
32#
33# Unless required by applicable law or agreed to in writing,
34# software distributed under the License is distributed on an
35# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
36# KIND, either express or implied.  See the License for the
37# specific language governing permissions and limitations
38# under the License.
39#
40"""serde.py: SerDe defines the interface for serialization/deserialization.
41# Everytime a message is read from pulsar topic, the serde is invoked to
42# serialize the bytes into an object before invoking the process method.
43# Anytime a python object needs to be written back to pulsar, it is
44# serialized into bytes before writing.
45"""
46from abc import abstractmethod
47
48import pickle
49
50class SerDe(object):
51  """Interface for Serialization/Deserialization"""
52  @abstractmethod
53  def serialize(self, input):
54    """Serialize input message into bytes"""
55    pass
56
57  @abstractmethod
58  def deserialize(self, input_bytes):
59    """Serialize input_bytes into an object"""
60    pass
61
62class PickleSerDe(SerDe):
63  """Pickle based serializer"""
64  def serialize(self, input):
65      return pickle.dumps(input)
66
67  def deserialize(self, input_bytes):
68      return pickle.loads(input_bytes)
69
70class IdentitySerDe(SerDe):
71  """Simple Serde that just conversion to string and back"""
72  def __init__(self):
73    self._types = [int, float, complex, str]
74
75  def serialize(self, input):
76    if type(input) in self._types:
77      return str(input).encode('utf-8')
78    if type(input) == bytes:
79      return input
80    raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input))
81
82  def deserialize(self, input_bytes):
83    for typ in self._types:
84      try:
85        return typ(input_bytes.decode('utf-8'))
86      except:
87        pass
88    return input_bytes
class SerDe:
51class SerDe(object):
52  """Interface for Serialization/Deserialization"""
53  @abstractmethod
54  def serialize(self, input):
55    """Serialize input message into bytes"""
56    pass
57
58  @abstractmethod
59  def deserialize(self, input_bytes):
60    """Serialize input_bytes into an object"""
61    pass

Interface for Serialization/Deserialization

SerDe()
@abstractmethod
def serialize(self, input):
53  @abstractmethod
54  def serialize(self, input):
55    """Serialize input message into bytes"""
56    pass

Serialize input message into bytes

@abstractmethod
def deserialize(self, input_bytes):
58  @abstractmethod
59  def deserialize(self, input_bytes):
60    """Serialize input_bytes into an object"""
61    pass

Serialize input_bytes into an object

class PickleSerDe(SerDe):
63class PickleSerDe(SerDe):
64  """Pickle based serializer"""
65  def serialize(self, input):
66      return pickle.dumps(input)
67
68  def deserialize(self, input_bytes):
69      return pickle.loads(input_bytes)

Pickle based serializer

PickleSerDe()
def serialize(self, input):
65  def serialize(self, input):
66      return pickle.dumps(input)

Serialize input message into bytes

def deserialize(self, input_bytes):
68  def deserialize(self, input_bytes):
69      return pickle.loads(input_bytes)

Serialize input_bytes into an object

class IdentitySerDe(SerDe):
71class IdentitySerDe(SerDe):
72  """Simple Serde that just conversion to string and back"""
73  def __init__(self):
74    self._types = [int, float, complex, str]
75
76  def serialize(self, input):
77    if type(input) in self._types:
78      return str(input).encode('utf-8')
79    if type(input) == bytes:
80      return input
81    raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input))
82
83  def deserialize(self, input_bytes):
84    for typ in self._types:
85      try:
86        return typ(input_bytes.decode('utf-8'))
87      except:
88        pass
89    return input_bytes

Simple Serde that just conversion to string and back

IdentitySerDe()
73  def __init__(self):
74    self._types = [int, float, complex, str]
def serialize(self, input):
76  def serialize(self, input):
77    if type(input) in self._types:
78      return str(input).encode('utf-8')
79    if type(input) == bytes:
80      return input
81    raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input))

Serialize input message into bytes

def deserialize(self, input_bytes):
83  def deserialize(self, input_bytes):
84    for typ in self._types:
85      try:
86        return typ(input_bytes.decode('utf-8'))
87      except:
88        pass
89    return input_bytes

Serialize input_bytes into an object