Module pulsar.schema.definition

Expand source code
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from abc import abstractmethod, ABCMeta
from enum import Enum, EnumMeta
from collections import OrderedDict
from six import with_metaclass


def _check_record_or_field(x):
    if (type(x) is type and not issubclass(x, Record)) \
            and not isinstance(x, Field):
        raise Exception('Argument ' + x + ' is not a Record or a Field')


class RecordMeta(type):
    def __new__(metacls, name, parents, dct):
        if name != 'Record':
            # Do not apply this logic to the base class itself
            dct['_fields'] = RecordMeta._get_fields(dct)
            dct['_required'] = False
        return type.__new__(metacls, name, parents, dct)

    @classmethod
    def _get_fields(cls, dct):
        # Build a set of valid fields for this record
        fields = OrderedDict()
        for name, value in dct.items():
            if issubclass(type(value), EnumMeta):
                # Wrap Python enums
                value = _Enum(value)
            elif type(value) == RecordMeta:
                # We expect an instance of a record rather than the class itself
                value = value()

            if isinstance(value, Record) or isinstance(value, Field):
                fields[name] = value
        return fields


class Record(with_metaclass(RecordMeta, object)):

    def __init__(self, *args, **kwargs):
        if args:
            # Only allow keyword args
            raise TypeError('Non-keyword arguments not allowed when initializing Records')

        for k, value in self._fields.items():
            if k in kwargs:
                # Value was overridden at constructor
                self.__setattr__(k, kwargs[k])
            else:
                # Set field to default value
                self.__setattr__(k, value.default())

    @classmethod
    def schema(cls):
        schema = {
            'name': str(cls.__name__),
            'type': 'record',
            'fields': []
        }

        for name in sorted(cls._fields.keys()):
            field = cls._fields[name]
            field_type = field.schema() if field._required else ['null', field.schema()]
            schema['fields'].append({
                'name': name,
                'type': field_type
            })
        return schema

    def __setattr__(self, key, value):
        if key not in self._fields:
            raise AttributeError('Cannot set undeclared field ' + key + ' on record')
        super(Record, self).__setattr__(key, value)

    def __eq__(self, other):
        for field in self._fields:
            if self.__getattribute__(field) != other.__getattribute__(field):
                return False
        return True

    def __str__(self):
        return str(self.__dict__)

    def type(self):
        return str(self.__class__.__name__)


class Field(object):
    def __init__(self, default=None, required=False):
        self._default = default
        self._required = required

    @abstractmethod
    def type(self):
        pass

    def schema(self):
        # For primitive types, the schema would just be the type itself
        return self.type()

    def default(self):
        return self._default

# All types


class Null(Field):
    def type(self):
        return 'null'


class Boolean(Field):
    def type(self):
        return 'boolean'


class Integer(Field):
    def type(self):
        return 'int'


class Long(Field):
    def type(self):
        return 'long'


class Float(Field):
    def type(self):
        return 'float'


class Double(Field):
    def type(self):
        return 'double'


class Bytes(Field):
    def type(self):
        return 'bytes'


class String(Field):
    def type(self):
        return 'string'


# Complex types

class _Enum(Field):
    def __init__(self, enum_type):
        if not issubclass(enum_type, Enum):
            raise Exception(enum_type + " is not a valid Enum type")
        self.enum_type = enum_type
        super(_Enum, self).__init__()

    def type(self):
        return 'enum'

    def schema(self):
        return {
            'type': self.type(),
            'name': self.enum_type.__name__,
            'symbols': [x.name for x in self.enum_type]
        }


class Array(Field):
    def __init__(self, array_type):
        _check_record_or_field(array_type)
        self.array_type = array_type
        super(Array, self).__init__()

    def type(self):
        return 'array'

    def schema(self):
        return {
            'type': self.type(),
            'items': self.array_type.type()
        }


class Map(Field):
    def __init__(self, value_type):
        _check_record_or_field(value_type)
        self.value_type = value_type
        super(Map, self).__init__()

    def type(self):
        return 'map'

    def schema(self):
        return {
            'type': self.type(),
            'values': self.value_type.type()
        }

Classes

class Array (array_type)
Expand source code
class Array(Field):
    def __init__(self, array_type):
        _check_record_or_field(array_type)
        self.array_type = array_type
        super(Array, self).__init__()

    def type(self):
        return 'array'

    def schema(self):
        return {
            'type': self.type(),
            'items': self.array_type.type()
        }

Ancestors

Methods

def schema(self)
Expand source code
def schema(self):
    return {
        'type': self.type(),
        'items': self.array_type.type()
    }
def type(self)
Expand source code
def type(self):
    return 'array'
class Boolean (default=None, required=False)
Expand source code
class Boolean(Field):
    def type(self):
        return 'boolean'

Ancestors

Methods

def type(self)
Expand source code
def type(self):
    return 'boolean'
class Bytes (default=None, required=False)
Expand source code
class Bytes(Field):
    def type(self):
        return 'bytes'

Ancestors

Methods

def type(self)
Expand source code
def type(self):
    return 'bytes'
class Double (default=None, required=False)
Expand source code
class Double(Field):
    def type(self):
        return 'double'

Ancestors

Methods

def type(self)
Expand source code
def type(self):
    return 'double'
class Field (default=None, required=False)
Expand source code
class Field(object):
    def __init__(self, default=None, required=False):
        self._default = default
        self._required = required

    @abstractmethod
    def type(self):
        pass

    def schema(self):
        # For primitive types, the schema would just be the type itself
        return self.type()

    def default(self):
        return self._default

Subclasses

Methods

def default(self)
Expand source code
def default(self):
    return self._default
def schema(self)
Expand source code
def schema(self):
    # For primitive types, the schema would just be the type itself
    return self.type()
def type(self)
Expand source code
@abstractmethod
def type(self):
    pass
class Float (default=None, required=False)
Expand source code
class Float(Field):
    def type(self):
        return 'float'

Ancestors

Methods

def type(self)
Expand source code
def type(self):
    return 'float'
class Integer (default=None, required=False)
Expand source code
class Integer(Field):
    def type(self):
        return 'int'

Ancestors

Methods

def type(self)
Expand source code
def type(self):
    return 'int'
class Long (default=None, required=False)
Expand source code
class Long(Field):
    def type(self):
        return 'long'

Ancestors

Methods

def type(self)
Expand source code
def type(self):
    return 'long'
class Map (value_type)
Expand source code
class Map(Field):
    def __init__(self, value_type):
        _check_record_or_field(value_type)
        self.value_type = value_type
        super(Map, self).__init__()

    def type(self):
        return 'map'

    def schema(self):
        return {
            'type': self.type(),
            'values': self.value_type.type()
        }

Ancestors

Methods

def schema(self)
Expand source code
def schema(self):
    return {
        'type': self.type(),
        'values': self.value_type.type()
    }
def type(self)
Expand source code
def type(self):
    return 'map'
class Null (default=None, required=False)
Expand source code
class Null(Field):
    def type(self):
        return 'null'

Ancestors

Methods

def type(self)
Expand source code
def type(self):
    return 'null'
class Record (*args, **kwargs)
Expand source code
class Record(with_metaclass(RecordMeta, object)):

    def __init__(self, *args, **kwargs):
        if args:
            # Only allow keyword args
            raise TypeError('Non-keyword arguments not allowed when initializing Records')

        for k, value in self._fields.items():
            if k in kwargs:
                # Value was overridden at constructor
                self.__setattr__(k, kwargs[k])
            else:
                # Set field to default value
                self.__setattr__(k, value.default())

    @classmethod
    def schema(cls):
        schema = {
            'name': str(cls.__name__),
            'type': 'record',
            'fields': []
        }

        for name in sorted(cls._fields.keys()):
            field = cls._fields[name]
            field_type = field.schema() if field._required else ['null', field.schema()]
            schema['fields'].append({
                'name': name,
                'type': field_type
            })
        return schema

    def __setattr__(self, key, value):
        if key not in self._fields:
            raise AttributeError('Cannot set undeclared field ' + key + ' on record')
        super(Record, self).__setattr__(key, value)

    def __eq__(self, other):
        for field in self._fields:
            if self.__getattribute__(field) != other.__getattribute__(field):
                return False
        return True

    def __str__(self):
        return str(self.__dict__)

    def type(self):
        return str(self.__class__.__name__)

Static methods

def schema()
Expand source code
@classmethod
def schema(cls):
    schema = {
        'name': str(cls.__name__),
        'type': 'record',
        'fields': []
    }

    for name in sorted(cls._fields.keys()):
        field = cls._fields[name]
        field_type = field.schema() if field._required else ['null', field.schema()]
        schema['fields'].append({
            'name': name,
            'type': field_type
        })
    return schema

Methods

def type(self)
Expand source code
def type(self):
    return str(self.__class__.__name__)
class RecordMeta (*args, **kwargs)

type(object_or_name, bases, dict) type(object) -> the object's type type(name, bases, dict) -> a new type

Expand source code
class RecordMeta(type):
    def __new__(metacls, name, parents, dct):
        if name != 'Record':
            # Do not apply this logic to the base class itself
            dct['_fields'] = RecordMeta._get_fields(dct)
            dct['_required'] = False
        return type.__new__(metacls, name, parents, dct)

    @classmethod
    def _get_fields(cls, dct):
        # Build a set of valid fields for this record
        fields = OrderedDict()
        for name, value in dct.items():
            if issubclass(type(value), EnumMeta):
                # Wrap Python enums
                value = _Enum(value)
            elif type(value) == RecordMeta:
                # We expect an instance of a record rather than the class itself
                value = value()

            if isinstance(value, Record) or isinstance(value, Field):
                fields[name] = value
        return fields

Ancestors

  • builtins.type
class String (default=None, required=False)
Expand source code
class String(Field):
    def type(self):
        return 'string'

Ancestors

Methods

def type(self)
Expand source code
def type(self):
    return 'string'