Pulsar Python client
You can use a Pulsar Python client to create producers, consumers, and readers. For Pulsar features that Python clients support, see Client Feature Matrix.
Installation​
Use pip to install the latest version:
pip install 'pulsar-client==2.11.0'
You can install optional components alongside the client library:
# avro serialization
pip install 'pulsar-client[avro]==2.11.0'
# functions runtime
pip install 'pulsar-client[functions]==2.11.0'
# all optional components
pip install 'pulsar-client[all]==2.11.0'
Installation via PyPi is available for the following Python versions:
Platform | Supported Python versions |
---|---|
macOS (>= 11.0) | 3.7, 3.8, 3.9 and 3.10 |
Linux (including Alpine Linux) | 3.7, 3.8, 3.9 and 3.10 |
Connection URLs​
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the pulsar
scheme. The following is an example of localhost
with the default port 6650
:
pulsar://localhost:6650
If you have multiple brokers, separate IP:port
by commas:
pulsar://localhost:6550,localhost:6651,localhost:6652
If you use TLS authentication, add +ssl
in the scheme:
pulsar+ssl://pulsar.us-west.example.com:6651
API reference​
All the methods in producer, consumer, and reader of Pulsar Python clients are thread-safe. See the API docs for more details.
Release notes​
For the changelog of Pulsar Python clients, see release notes.
Examples​
You can find a variety of Python code examples for the pulsar-client
library.
Producer example​
The following example creates a Python producer for the my-topic
topic and sends 10 messages on that topic:
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
producer.send(('Hello-%d' % i).encode('utf-8'))
client.close()
Enable Chunking​
Message chunking enables Pulsar to process large payload messages by splitting the message into chunks at the producer side and aggregating chunked messages on the consumer side.
The message chunking feature is OFF by default. The following is an example of how to enable message chunking when creating a producer.
producer = client.create_producer(
topic,
chunking_enabled=True
)
By default, producer chunks the large message based on max message size (maxMessageSize
) configured at broker (eg: 5MB).
To enable chunking, you need to disable batching (chunking_enabled
=false
) concurrently.
Consumer example​
The following example creates a consumer with the my-subscription
subscription name on the my-topic
topic, receives incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker.
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()
This example shows how to configure negative acknowledgment.
from pulsar import Client, schema
client = Client('pulsar://localhost:6650')
consumer = client.subscribe('negative_acks','test',schema=schema.StringSchema())
producer = client.create_producer('negative_acks',schema=schema.StringSchema())
for i in range(10):
print('send msg "hello-%d"' % i)
producer.send_async('hello-%d' % i, callback=None)
producer.flush()
for i in range(10):
msg = consumer.receive()
consumer.negative_acknowledge(msg)
print('receive and nack msg "%s"' % msg.data())
for i in range(10):
msg = consumer.receive()
consumer.acknowledge(msg)
print('receive and ack msg "%s"' % msg.data())
try:
# No more messages expected
msg = consumer.receive(100)
except:
print("no more msg")
pass
Configure chunking​
You can limit the maximum number of chunked messages a consumer maintains concurrently by configuring the max_pending_chunked_message
and auto_ack_oldest_chunked_message_on_queue_full
parameters. When the threshold is reached, the consumer drops pending messages by silently acknowledging them or asking the broker to redeliver them later.
The following is an example of how to configure message chunking.
consumer = client.subscribe(topic, "my-subscription",
max_pending_chunked_message=10,
auto_ack_oldest_chunked_message_on_queue_full=False
)
Reader interface example​
You can use the Pulsar Python API to use the Pulsar reader interface. Here's an example:
# MessageId taken from a previously fetched message
msg_id = msg.message_id()
reader = client.create_reader('my-topic', msg_id)
while True:
msg = reader.read_next()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# No acknowledgment
Multi-topic subscriptions​
In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously. To use multi-topic subscriptions, you can supply a regular expression (regex) or a List
of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.
The following is an example:
import re
consumer = client.subscribe(re.compile('persistent://public/default/topic-*'), 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()
Create a Python client with multiple advertised listeners​
To ensure clients in both internal and external networks can connect to a Pulsar cluster, Pulsar introduces advertisedListeners.
The following example creates a Python client using multiple advertised listeners:
import pulsar
client = pulsar.Client('pulsar://localhost:6650', listener_name='external')
Schema​
Supported schema types​
You can use different built-in schema types in Pulsar. All the definitions are in the pulsar.schema
package.
Schema | Notes |
---|---|
BytesSchema | Get the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode |
StringSchema | Encode/decode payload as a UTF-8 string. Uses str objects |
JsonSchema | Require record definition. Serializes the record into standard JSON payload |
AvroSchema | Require record definition. Serializes in AVRO format |
Schema definition reference​
The schema definition is done through a class that inherits from pulsar.schema.Record
.
This class has a number of fields that can be of either pulsar.schema.Field
type or another nested Record
. All the fields are specified in the pulsar.schema
package. The fields are matching the AVRO field types.
Field Type | Python Type | Notes |
---|---|---|
Boolean | bool | |
Integer | int | |
Long | int | |
Float | float | |
Double | float | |
Bytes | bytes | |
String | str | |
Array | list | Need to specify record type for items. |
Map | dict | Key is always String . Need to specify value type. |
Additionally, any Python Enum
type can be used as a valid field type.
Fields parameters​
When adding a field, you can use these parameters in the constructor.
Argument | Default | Notes |
---|---|---|
default | None | Set a default value for the field, such as a = Integer(default=5) . |
required | False | Mark the field as "required". It is set in the schema accordingly. |
Schema definition examples​
Simple definition​
class Example(Record):
a = String()
b = Integer()
c = Array(String())
i = Map(String())
Using enums​
from enum import Enum
class Color(Enum):
red = 1
green = 2
blue = 3
class Example(Record):
name = String()
color = Color
Complex types​
class MySubRecord(Record):
x = Integer()
y = Long()
z = String()
class Example(Record):
a = String()
sub = MySubRecord()
Set namespace for Avro schema​
Set the namespace for the Avro Record schema using the special field _avro_namespace
.
class NamespaceDemo(Record):
_avro_namespace = 'xxx.xxx.xxx'
x = String()
y = Integer()
The schema definition is like this.
{
"name": "NamespaceDemo", "namespace": "xxx.xxx.xxx", "type": "record", "fields": [
{"name": "x", "type": ["null", "string"]},
{"name": "y", "type": ["null", "int"]}
]
}
Declare and validate schema​
Before the producer is created, the Pulsar broker validates that the existing topic schema is the correct type and that the format is compatible with the schema definition of a class. If the format of the topic schema is incompatible with the schema definition, an exception occurs in the producer creation.
Once a producer is created with a certain schema definition, it only accepts objects that are instances of the declared schema class.
Similarly, for a consumer or reader, the consumer returns an object (which is an instance of the schema record class) rather than raw bytes.
Example
consumer = client.subscribe(
topic='my-topic',
subscription_name='my-subscription',
schema=AvroSchema(Example) )
while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)
For more code examples, see Schema - Get started.
End-to-end encryption​
Pulsar encryption allows applications to encrypt messages at producers and decrypt messages at consumers. See Get started for more details.