The Pulsar Python client
Pulsar 的 Python 客户端库是对现有 C++ 客户端库的封装, 它所包含的功能与 C++ 客户端相同。 你可以在 C++ 客户端源码的 python
子目录中找到 Pulsar Python 客户端的相关源码 。
安装
你可以通过 PyPi,用 pip 的方式或者通过源代码来安装 pulsar-client
。
使用pip安装
使用包管理工具 pip 安装已经打包好的 pulsar-client
:
$ pip install pulsar-client==2.4.2
以下 Python 版本支持通过 PyPi 的方式进行安装:
平台 | 支持的 Python 版本 |
---|---|
MacOS |
10.11 (El Capitan) — 10.12 (Sierra) —
10.13 (High Sierra) — 10.14 (Mojave) | 2.7, 3.7 |
| Linux | 2.7, 3.4, 3.5, 3.6, 3.7 |
通过源代码安装
通过源码安装 pulsar-client
,请参照安装步骤中的示例进行操作并编译 Pulsar C++ 客户端库。 同时构建 Python 包到拓展库中。
安装已经构建好的 Python 包:
$ git clone https://github.com/apache/pulsar
$ cd pulsar/pulsar-client-cpp/python
$ sudo python setup.py install
API 手册:
在 api/python 上有完整的 Python API 手册。
示例
下面是 pulsar-client
库的各种 Python 代码示例。
生产者示例
创建一个 Python 生产者,主题为 my-topic
,并向这个主题发送了10条消息。
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
producer.send(('Hello-%d' % i).encode('utf-8'))
client.close()
消费者示例
创建一个消费者,使用 my-subscription
订阅主题 my-topic
,监听传入的消息,打印内容和到达消息的ID,并且向 Pulsar broker 确认每条消息:
consumer = client.subscribe('my-topic', 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# 确认已经成功处理消息
consumer.acknowledge(msg)
except:
# 消息处理失败
consumer.negative_acknowledge(msg)
client.close()
读者接口示例
You can use the Pulsar Python API to use the Pulsar reader interface. 下面是一个示例:
# MessageId 取自先前获取的消息
msg_id = msg.message_id()
reader = client.create_reader('my-topic', msg_id)
while True:
msg = reader.read_next()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# 无确认操作
Schema
声明和验证 schema
A schema can be declared by passing a class that inherits from pulsar.schema.Record
and defines the fields as class variables. 例如:
from pulsar.schema import *
class Example(Record):
a = String()
b = Integer()
c = Boolean()
用这种简单的 schema 定义方式,我们可以创建生产者,消费者和读者实例。
producer = client.create_producer(
topic='my-topic',
schema=AvroSchema(Example) )
producer.send(Example(a='Hello', b=1))
创建生产者时,Pulsar broker 会验证现有的 topic schema 是否是“ Avro ”类型,并且格式与 Example
类的 schema 定义兼容。
如果出现不匹配,生产者创建将会出现异常。
一旦一个生产者创建了一个 schema 定义,它只接受声明的 schema 类别的实例。
同样,对于消费者和阅读者,消费者会返回一个所有 schema 记录的对象实例,而不是原始的字节:
consumer = client.subscribe(
topic='my-topic',
subscription_name='my-subscription',
schema=AvroSchema(Example) )
while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# # 确认已经成功处理消息
consumer.acknowledge(msg)
except:
# 消息处理失败
consumer.negative_acknowledge(msg)
支持的 Schema 类型
There are different builtin schema types that can be used in Pulsar. All the definitions are in the pulsar.schema
package.
Schema | 备注 |
---|---|
BytesSchema | Get the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode |
StringSchema | Encode/decode payload as a UTF-8 string. Uses str objects |
JsonSchema | Require record definition. Serializes the record into standard JSON payload |
AvroSchema | Require record definition. Serializes in AVRO format |
Schema 定义参考
schema 定义是通过继承 pulsar.schema.Record
类完成的。
这个类可以有许多字段,可以是任何一种 pulsar.schema.Field
类型,甚至是 Record
的嵌套。 所有字段在 pulsar.schema
包中指定。 所有的字段匹配 AVRO 字段类型。
字段类型 | Python 类型 | 备注 |
---|---|---|
Boolean | bool | |
Integer | int | |
Long | int | |
Float | float | |
Double | float | |
Bytes | bytes | |
String | str | |
Array | list | 需要指定标签的记录类型 |
Map | dict | Key is always String . Need to specify value type |
另外,任何 Python ` Enum </ code> 类型都可以用作有效的字段类型。
字段参数
在添加字段时,这些参数会用于构造器:
参数 | 默认值 | 备注 |
---|---|---|
default` |
Schema 定义示例
简单定义
class Example(Record):
a = String()
b = Integer()
c = Array(String())
i = Map(String())
使用枚举
class Example(Record):
a = String()
b = Integer()
c = Array(String())
i = Map(String())
复杂类型
class MySubRecord(Record):
x = Integer()
y = Long()
z = String()
class Example(Record):
a = String()
sub = MySubRecord()