Skip to main content

The Pulsar Python client

The Pulsar Python client library is a wrapper over the existing C++ client library and exposes all of the same features. You can find the code in the python GitHub Repository of the C++ client code.

Installation​

You can install the pulsar-client library either via PyPi, using pip, or by building the library from source.

Installation using pip​

To install the pulsar-client library as a pre-built package using the pip package manager:

pip install pulsar-client==2.4.0

Installation via PyPi is available for the following Python versions:

PlatformSupported Python versions
MacOS
10.11 (El Capitan) — 10.12 (Sierra) — 10.13 (High Sierra) — 10.14 (Mojave)
2.7, 3.7
Linux2.7, 3.4, 3.5, 3.6, 3.7

Installing from source​

To install the pulsar-client library by building from source, follow these instructions and compile the Pulsar C++ client library. That also builds the Python binding for the library.

To install the built Python bindings:


$ git clone https://github.com/apache/pulsar
$ cd pulsar/pulsar-client-cpp/python
$ sudo python setup.py install

API Reference​

The complete Python API reference is available at api/python.

Examples​

You can find a variety of Python code examples for the pulsar-client library as below.

Producer example​

This creates a Python producer for the my-topic topic and send 10 messages on that topic:


import pulsar

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('my-topic')

for i in range(10):
producer.send(('Hello-%d' % i).encode('utf-8'))

client.close()

Consumer example​

This example creates a consumer with the my-subscription subscription on the my-topic topic, listens for incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker:


import pulsar

client = pulsar.Client('pulsar://localhost:6650')

consumer = client.subscribe('my-topic', 'my-subscription')

while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)

client.close()

This example shows how to configure negative acknowledgement.


from pulsar import Client, schema

client = Client('pulsar://localhost:6650')

consumer = client.subscribe('negative_acks','test',schema=schema.StringSchema())
producer = client.create_producer('negative_acks',schema=schema.StringSchema())

for i in range(10):
print('send msg "hello-%d"' % i)
producer.send_async('hello-%d' % i, callback=None)

producer.flush()

for i in range(10):
msg = consumer.receive()
consumer.negative_acknowledge(msg)
print('receive and nack msg "%s"' % msg.data())

for i in range(10):
msg = consumer.receive()
consumer.acknowledge(msg)
print('receive and ack msg "%s"' % msg.data())

try:
# No more messages expected
msg = consumer.receive(100)
except:
print("no more msg")
pass

Reader interface example​

You can use the Pulsar Python API to use the Pulsar reader interface. Here's an example:


# MessageId taken from a previously fetched message
msg_id = msg.message_id()

reader = client.create_reader('my-topic', msg_id)

while True:
msg = reader.read_next()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# No acknowledgment

Schema​

Declaring and validating schema​

A schema can be declared by passing a class that inherits from pulsar.schema.Record and defines the fields as class variables. For example:


from pulsar.schema import *

class Example(Record):
a = String()
b = Integer()
c = Boolean()

With this simple schema definition we can then create producers, consumers and readers instances that will be referring to that.


producer = client.create_producer(
topic='my-topic',
schema=AvroSchema(Example) )

producer.send(Example(a='Hello', b=1))

When the producer is created, the Pulsar broker validates that the existing topic schema is indeed of "Avro" type and that the format is compatible with the schema definition of the Example class.

If there is a mismatch, the producer creation raises an exception.

Once a producer is created with a certain schema definition, it only accepts objects that are instances of the declared schema class.

Similarly, for a consumer/reader, the consumer returns an object, instance of the schema record class, rather than the raw bytes:


consumer = client.subscribe(
topic='my-topic',
subscription_name='my-subscription',
schema=AvroSchema(Example) )

while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)

Supported schema types​

There are different builtin schema types that can be used in Pulsar. All the definitions are in the pulsar.schema package.

SchemaNotes
BytesSchemaGet the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode
StringSchemaEncode/decode payload as a UTF-8 string. Uses str objects
JsonSchemaRequire record definition. Serializes the record into standard JSON payload
AvroSchemaRequire record definition. Serializes in AVRO format

Schema definition reference​

The schema definition is done through a class that inherits from pulsar.schema.Record.

This class can have a number of fields which can be of either pulsar.schema.Field type or even another nested Record. All the fields are also specified in the pulsar.schema package. The fields are matching the AVRO fields types.

Field TypePython TypeNotes
Booleanbool
Integerint
Longint
Floatfloat
Doublefloat
Bytesbytes
Stringstr
ArraylistNeed to specify record type for items
MapdictKey is always String. Need to specify value type

Additionally, any Python Enum type can be used as a valid field type

Fields parameters​

When adding a field these parameters can be used in the constructor:

ArgumentDefaultNotes
defaultNoneSet a default value for the field. Eg: a = Integer(default=5)
requiredFalseMark the field as "required". This will set it in the schema accordingly.

Schema definition examples​

Simple definition​

class Example(Record):
a = String()
b = Integer()
c = Array(String())
i = Map(String())

Using enums​

from enum import Enum

class Color(Enum):
red = 1
green = 2
blue = 3

class Example(Record):
name = String()
color = Color

Complex types​

class MySubRecord(Record):
x = Integer()
y = Long()
z = String()

class Example(Record):
a = String()
sub = MySubRecord()