Skip to main content

Use Python client

Create a producer​

The following example creates a Python producer for the my-topic topic and sends 10 messages on that topic:

import pulsar

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('my-topic')

for i in range(10):
producer.send(('Hello-%d' % i).encode('utf-8'))

client.close()

Create a consumer​

The following example creates a consumer with the my-subscription subscription name on the my-topic topic, receives incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker.

import pulsar

client = pulsar.Client('pulsar://localhost:6650')

consumer = client.subscribe('my-topic', 'my-subscription')

while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)

client.close()

This example shows how to configure negative acknowledgment.

from pulsar import Client, schema
client = Client('pulsar://localhost:6650')
consumer = client.subscribe('negative_acks','test',schema=schema.StringSchema())
producer = client.create_producer('negative_acks',schema=schema.StringSchema())
for i in range(10):
print('send msg "hello-%d"' % i)
producer.send_async('hello-%d' % i, callback=None)
producer.flush()
for i in range(10):
msg = consumer.receive()
consumer.negative_acknowledge(msg)
print('receive and nack msg "%s"' % msg.data())
for i in range(10):
msg = consumer.receive()
consumer.acknowledge(msg)
print('receive and ack msg "%s"' % msg.data())
try:
# No more messages expected
msg = consumer.receive(100)
except:
print("no more msg")
pass

Create a reader​

You can use the Pulsar Python API to use the Pulsar reader interface. Here's an example:

# MessageId taken from a previously fetched message
msg_id = msg.message_id()

reader = client.create_reader('my-topic', msg_id)

while True:
msg = reader.read_next()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# No acknowledgment

Work with schema​

Supported schema types​

You can use different built-in schema types in Pulsar. All the definitions are in the pulsar.schema package.

SchemaNotes
BytesSchemaGet the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode
StringSchemaEncode/decode payload as a UTF-8 string. Uses str objects
JsonSchemaRequire record definition. Serializes the record into standard JSON payload
AvroSchemaRequire record definition. Serializes in AVRO format

Schema definition reference​

The schema definition is done through a class that inherits from pulsar.schema.Record.

This class has a number of fields that can be of either pulsar.schema.Field type or another nested Record. All the fields are specified in the pulsar.schema package. The fields are matching the AVRO field types.

Field TypePython TypeNotes
Booleanbool
Integerint
Longint
Floatfloat
Doublefloat
Bytesbytes
Stringstr
ArraylistNeed to specify record type for items.
MapdictKey is always String. Need to specify value type.

Additionally, any Python Enum type can be used as a valid field type.

Fields parameters​

When adding a field, you can use these parameters in the constructor.

ArgumentDefaultNotes
defaultNoneSet a default value for the field, such as a = Integer(default=5).
requiredFalseMark the field as "required". It is set in the schema accordingly.

Schema definition examples​

Simple definition​
class Example(Record):
a = String()
b = Integer()
c = Array(String())
i = Map(String())
Using enums​
from enum import Enum

class Color(Enum):
red = 1
green = 2
blue = 3

class Example(Record):
name = String()
color = Color
Complex types​
class MySubRecord(Record):
x = Integer()
y = Long()
z = String()

class Example(Record):
a = String()
sub = MySubRecord()
Set namespace for Avro schema​

Set the namespace for the Avro Record schema using the special field _avro_namespace.

class NamespaceDemo(Record):
_avro_namespace = 'xxx.xxx.xxx'
x = String()
y = Integer()

The schema definition is like this.

{
"name": "NamespaceDemo", "namespace": "xxx.xxx.xxx", "type": "record", "fields": [
{"name": "x", "type": ["null", "string"]},
{"name": "y", "type": ["null", "int"]}
]
}

Declare and validate schema​

Before the producer is created, the Pulsar broker validates that the existing topic schema is the correct type and that the format is compatible with the schema definition of a class. If the format of the topic schema is incompatible with the schema definition, an exception occurs in the producer creation.

Once a producer is created with a certain schema definition, it only accepts objects that are instances of the declared schema class.

Similarly, for a consumer or reader, the consumer returns an object (which is an instance of the schema record class) rather than raw bytes.

Example

consumer = client.subscribe(
topic='my-topic',
subscription_name='my-subscription',
schema=AvroSchema(Example) )

while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)

For more code examples, see Schema - Get started.