The Pulsar Java client
Pulsar Java 客户端可以用于创建Java生产者(Producer)和消费者(Consumer),消息读取器以及执行管理的任务 The current version of the Java client is 2.4.2.
Pulsar客户端的Javadoc分成了两个包:
包 | 说明: | Maven Artifact |
---|---|---|
org.apache.pulsar.client.api | 生产者和消费者API | org.apache.pulsar:pulsar-client:2.4.2 |
org.apache.pulsar.client.admin | Java 管理API | org.apache.pulsar:pulsar-client-admin:2.4.2 |
本文档仅关注Pulsar主题消息的生产和消费的客户端API. 关于使用 Java 管理客户端的指南, 请参见 Pulsar管理接口。
安装
最新版本的Pulsar Java 客户端库可通过 Maven中央仓库 使用。 要使用最新版本, 请将 pulsar-client
库添加到构建配置中。
Maven
如果你使用maven,添加以下内容到你的 pom.xml
中:
<!-- in your <properties> block -->
<pulsar.version>2.4.2</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
Gradle
如果你使用Gradle,添加以下内容到你的 build.gradle
中:
def pulsarVersion = '2.4.2'
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
}
连接URL
要使用客户端连接到Pulsar,你需要指定Pulsar 协议URL。
Pulsar protocol URLs are assigned to specific clusters, use the pulsar
scheme and have a default port of 6650. Here's an example for localhost
:
pulsar://localhost:6650
如果你有多个broker,URL可能看起来如下:
pulsar://localhost:6550,localhost:6651,localhost:6652
生产环境的Pulsar 集群URL类似这样:
pulsar://pulsar.us-west.example.com:6650
If you're using TLS authentication, the URL will look like something like this:
pulsar+ssl://pulsar.us-west.example.com:6651
客户端配置
你可以用一个URL来实例化一个连接到指定的Pulsar 集群的 PulsarClient 对象,像这样:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
如果你有多个broker,你可以如下初始化PulsarClient:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")
.build();
默认的broker URL是单机集群。
如果你使用单机模式运行一个集群,broker将默认使用
pulsar://localhost:6650
If you create a client, you may use the loadConf
configuration. Below are the available parameters used in loadConf
.
类型 | Name | 说明: | 默认值 |
---|---|---|---|
String |
serviceUrl
|Service URL provider for Pulsar service | None String | authPluginClassName
| Name of the authentication plugin | None String | authParams
| String represents parameters for the authentication plugin
Example
key1:val1,key2:val2|None long|operationTimeoutMs
|Operation timeout |30000 long|statsIntervalSeconds
|Interval between each stat info
Stats is activated with positive statsInterval
statsIntervalSeconds
should be set to 1 second at least |60 int|numIoThreads
| Number of threads used for handling connections to brokers | 1 int|numListenerThreads
|Number of threads used for handling message listeners | 1 boolean|useTcpNoDelay
|Whether to use TCP no-delay flag on the connection to disable Nagle algorithm |true boolean |useTls
|Whether to use TLS encryption on the connection| false string | tlsTrustCertsFilePath
|Path to the trusted TLS certificate file|None boolean|tlsAllowInsecureConnection
|Whether the Pulsar client accepts untrusted TLS certificate from broker | false boolean | tlsHostnameVerificationEnable
| Whether to enable TLS hostname verification|false int|concurrentLookupRequest
|Number of concurrent lookup requests allowed to send on each broker connection to prevent overload on broker|5000 int|maxLookupRequest
|Maximum number of lookup requests allowed on each broker connection to prevent overload on broker | 50000 int|maxNumberOfRejectedRequestPerConnection
|Maximum number of rejected requests of a broker in a certain time frame (30 seconds) after the current connection is closed and the client creates a new connection to connect to a different broker|50 int|keepAliveIntervalSeconds
|Seconds of keeping alive interval for each client broker connection|30 int|connectionTimeoutMs
|Duration of waiting for a connection to a broker to be established
If the duration passes without a response from a broker, the connection attempt is dropped|10000 int|requestTimeoutMs
|Maximum duration for completing a request |60000 int|defaultBackoffIntervalNanos
| Default duration for a backoff interval | TimeUnit.MILLISECONDS.toNanos(100); long|maxBackoffIntervalNanos
|Maximum duration for a backoff interval|TimeUnit.SECONDS.toNanos(30)
完整的配置参数列表参考 PulsarClient 类的javadoc文档 。
Producers
在Pulsar中,生产者写消息到主题中。 一旦你实例化一个PulsarClient 客户端对象(在如上z章节),你可以创建一个Producer 生产者用于特定的主题。
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
// 然后你就可以发送消息到指定的broker 和topic上:
producer.send("My message".getBytes());
By default, producers produce messages that consist of byte arrays. You can produce different types, however, by specifying a message schema.
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
stringProducer.send("My message");
You should always make sure to close your producers, consumers, and clients when they are no longer needed:
java producer.close(); consumer.close(); client.close();
Close operations can also be asynchronous:
java producer.closeAsync() .thenRun(() -> System.out.println("Producer closed")); .exceptionally((ex) -> { System.err.println("Failed to close producer: " + ex); return ex; });
生产者配置
如果实例化 生产者
对象时仅指定主题topic名称 (如上面的示例所示), 则生产者将使用默认配置。 要使用非默认配置, 你可以设置多种可配置的参数。
For a full listing, see the Javadoc for the ProducerBuilder class. 下面是一个示例:
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
消息路由
当使用分区主题时,当你使用生产者发布消息时你可以指定路由模式。 有关使用 Java 客户端指定路由模式的更多内容, 请参见 分区主题 cookbook。
异步发送
你可以使用Java客户端异步发布消息。 使用异步发送,生产者将消息放入阻塞队列并立即返回。 然后,客户端将在后台将消息发送给broker。 如果队列已满(最大值可配置),则在调用API时,生产者可能会被阻塞或立即失败,具体取决于传递给生产者的参数。
以下是异步发送操作的示例:
producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
System.out.printf("Message with ID %s successfully sent", msgId);
});
As you can see from the example above, async send operations return a MessageId
wrapped in a CompletableFuture
.
消息配置
除了value之外, 还可以在特定消息上设置其他选项:
producer.newMessage()
.key("my-message-key")
.value("my-async-message".getBytes())
.property("my-key", "my-value")
.property("my-other-key", "my-other-value")
.send();
对于前一种情况,也可以使用sendAsync()
来终止构建器链,并获取future返回值。
Consumers
在Pulsar中,消费者订阅topic主题并处理生产者发布到这些主题的消息。 你可以首先实例化一个PulsarClient 对象并传递给他一个borker(如上所示) URL来实例化一个消费者。
一旦实例化一个PulsarClient 对象,你可以指定一个主题和一个订阅来创建一个 Consumer 消费者。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
subscribe
方法将自动将订阅消费者指定的主题和订阅。 一种让消费者监听主题的方法是使用while
循环。 In this example loop, the consumer listens for messages, prints the contents of any message that's received, and then acknowledges that the message has been processed. If the processing logic fails, we use negative acknowledgement to have the message redelivered at a later point in time.
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
消费者配置
如果实例化 消费者
对象, 仅指定主题和订阅名称, 如上面的示例所示, 消费者将采用默认配置。 要使用非默认配置, 你可以设置多种可配置的参数。 有关完整列表, 请参阅 ConsumerBuilder
类javadoc文档。 下面是一个示例:
这是一个示例配置:
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
异步接收
receive
方法将异步接受消息(消费者处理器将被阻塞,直到有消息到达)。 你也可以使用异步接收方法,这将在一个新消息到达时立即返回一个CompletableFuture
对象。
下面是一个示例:
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
Async receive operations return a Message
wrapped inside of a CompletableFuture
.
多主题订阅
消费者除了订阅单个Pulsar主题外,你还可以使用多主题订阅订阅多个主题。 若要使用多主题订阅, 可以提供一个topic正则表达式 (regex) 或 主题List
。 如果通过 regex 选择主题, 则所有主题都必须位于同一Pulsar命名空间中。
下面是一些示例:
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);
// 订阅命名空间中的所有主题
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();
// 使用regex订阅命名空间中的主题子集
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
你还可以订阅明确的主题列表 (如果愿意, 可跨命名空间):
List<String> topics = Arrays.asList(
"topic-1",
"topic-2",
"topic-3"
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();
// 或者:
Consumer multiTopicConsumer = consumerBuilder
.topics(
"topic-1",
"topic-2",
"topic-3"
)
.subscribe();
You can also subscribe to multiple topics asynchronously using the subscribeAsync
method rather than the synchronous subscribe
method. 下面是一个示例:
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
consumerBuilder
.topics(topics)
.subscribeAsync()
.thenAccept(this::receiveMessageFromConsumer);
private void receiveMessageFromConsumer(Consumer consumer) {
consumer.receiveAsync().thenAccept(message -> {
// Do something with the received message
receiveMessageFromConsumer(consumer);
});
}
订阅模型
Pulsar has various subscription modes to match different scenarios. A topic can have multiple subscriptions with different subscription modes. However, a subscription can only have one subscription mode at a time.
A subscription is identified with the subscription name, and a subscription name can specify only one subscription mode at a time. You can change the subscription mode, yet you have to let all existing consumers of this subscription offline first.
Different subscription modes have different message distribution modes. This section describes the differences of subscription modes and how to use them.
In order to better describe their differences, assuming you have a topic named "my-topic", and the producer has published 10 messages.
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(false)
.create();
// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();
Exclusive
Create a new consumer and subscribe with the Exclusive
subscription mode.
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe()
Only the first consumer is allowed to the subscription, other consumers receive an error. The first consumer receives all 10 messages, and the consuming order is the same as the producing order.
Note:
If topic is a partitioned topic, the first consumer subscribes to all partitioned topics, other consumers are not assigned with partitions and receive an error.
Failover(灾备)
Create new consumers and subscribe with theFailover
subscription mode.
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
//conumser1 is the active consumer, consumer2 is the standby consumer.
//consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.
Multiple consumers can attach to the same subscription, yet only the first consumer is active, and others are standby. When the active consumer is disconnected, messages will be dispatched to one of standby consumers, and the standby consumer becomes active consumer.
If the first active consumer receives 5 messages and is disconnected, the standby consumer becomes active consumer. Consumer1 will receive:
("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-2", "message-2-1")
("key-2", "message-2-2")
consumer2 will receive:
("key-2", "message-2-3")
("key-3", "message-3-1")
("key-3", "message-3-2")
("key-4", "message-4-1")
("key-4", "message-4-2")
Note:
If a topic is a partitioned topic, each partition only has one active consumer, messages of one partition only distributed to one consumer, messages of multiple partitions are distributed to multiple consumers.
Shared(共享)
Create new consumers and subscribe with Shared
subscription mode:
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe()
//Both consumer1 and consumer 2 is active consumers.
In shared subscription mode, multiple consumers can attach to the same subscription and message are delivered in a round robin distribution across consumers.
If a broker dispatches only one message at a time, consumer1 will receive:
("key-1", "message-1-1")
("key-1", "message-1-3")
("key-2", "message-2-2")
("key-3", "message-3-1")
("key-4", "message-4-1")
consumer 2 will receive:
("key-1", "message-1-2")
("key-2", "message-2-1")
("key-2", "message-2-3")
("key-3", "message-3-2")
("key-4", "message-4-2")
Shared
subscription is different from Exclusive
and Failover
subscription modes. Shared
subscription has better flexibility, but cannot provide order guarantee.
Key_Shared
This is a new subscription mode since 2.4.0 release, create new consumers and subscribe with Key_Shared
subscription mode:
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
//Both consumer1 and consumer2 are active consumers.
Key_Shared
subscription is like Shared
subscription, all consumers can attach to the same subscription. But it is different from Key_Shared
subscription, messages with the same key are delivered to only one consumer in order. The possible distribution of messages between different consumers(by default we do not know in advance which keys will be assigned to a consumer, but a key will only be assigned to a consumer at the same time. ) .
consumer1 will receive:
("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-3", "message-3-1")
("key-3", "message-3-2")
consumer 2 will receive:
("key-2", "message-2-1")
("key-2", "message-2-2")
("key-2", "message-2-3")
("key-4", "message-4-1")
("key-4", "message-4-2")
Note:
If the message key is not specified, messages without key will be dispatched to one consumer in order by default.
Reader 接口
使用 reader 接口, Pulsar客户可以在主题中“手动定位”自己,从指定的消息开始向前读取所有消息。 The Pulsar API for Java enables you to create Reader objects by specifying a topic, a MessageId , and ReaderConfiguration .
下面是一个示例:
ReaderConfiguration conf = new ReaderConfiguration();
byte[] msgIdBytes = // 一些消息ID 的字节数组
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
while (true) {
Message message = reader.readNext();
// 处理消息
}
在上面的示例中,实例化一个Reader
对象对指定的主题和消息(ID); reader将遍历主题中msgIdBytes
(取值方式取决于应用程序) 之后的消息。
上面的示例代码展示了Reader
对象指向特定的消息(ID),但你也可以使用MessageId.earliest
来指向topic上最早可用的消息,使用MessageId.latest
指向最新的消息。
Schema
In Pulsar, all message data consists of byte arrays "under the hood." Message schemas enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). 如果在不指定schema的情况下构造 生产者,则生产者只能生成类型为 byte[]
的消息。 下面是一个示例:
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.create();
The producer above is equivalent to a Producer<byte[]>
(in fact, you should always explicitly specify the type). If you'd like to use a producer for a different type of data, you'll need to specify a schema that informs Pulsar which data type will be transmitted over the topic.
Schema实例
假设您有一个 SensorReading
类, 你想通过Pulsar主题进行传输:
public class SensorReading {
public float temperature;
public SensorReading(float temperature) {
this.temperature = temperature;
}
// A no-arg constructor is required
public SensorReading() {
}
public float getTemperature() {
return temperature;
}
public void setTemperature(float temperature) {
this.temperature = temperature;
}
}
你可以创建一个Producer<SensorReading>
(或Consumer<SensorReading>
)像这样:
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
以下schema格式目前可用于 Java:
无schema 或者字节数组schema(可以使用
Schema.BYTES
):Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES) .topic("some-raw-bytes-topic") .create();
或者:
Producer<byte[]> bytesProducer = client.newProducer() .topic("some-raw-bytes-topic") .create();
String
for normal UTF-8-encoded string data. This schema can be applied usingSchema.STRING
:Producer<String> stringProducer = client.newProducer(Schema.STRING) .topic("some-string-topic") .create();
JSON schemas can be created for POJOs using
Schema.JSON
. 下面是一个示例:Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class)) .topic("some-pojo-topic") .create();
Protobuf schemas can be generate using
Schema.PROTOBUF
. The following example shows how to create the Protobuf schema and use it to instantiate a new producer:Producer<MyProtobuf> protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class)) .topic("some-protobuf-topic") .create();
Avro schemas can be defined with the help of
Schema.AVRO
. The next code snippet demonstrates the creation and usage of the Avro schema:Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class)) .topic("some-avro-topic") .create();
Authentication
Pulsar currently supports two authentication schemes: TLS and Athenz. The Pulsar Java client can be used with both.
TLS Authentication
要使用TLS,你需要使用setUseTls
方法设置TLS为true
,将您的Pulsar客户端指向TLS证书路径,并提供证书和密钥文件的路径。
这是一个示例配置:
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", "/path/to/client-cert.pem");
authParams.put("tlsKeyFile", "/path/to/client-key.pem");
Authentication tlsAuth = AuthenticationFactory
.create(AuthenticationTls.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker.com:6651")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/cacert.pem")
.authentication(tlsAuth)
.build();
Athenz
要使用Athenz做为身份认证提供者,你需要use TLS并且在hash提供如下四个参数的值:
tenantDomain
tenantService
providerDomain
privateKey
You can also set an optional keyId
. 这是一个示例配置:
Map<String, String> authParams = new HashMap<>();
authParams.put("tenantDomain", "shopping"); // Tenant domain name
authParams.put("tenantService", "some_app"); // Tenant service name
authParams.put("providerDomain", "pulsar"); // Provider domain name
authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")
Authentication athenzAuth = AuthenticationFactory
.create(AuthenticationAthenz.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker.com:6651")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/cacert.pem")
.authentication(athenzAuth)
.build();
支持的格式:
privateKey
参数支持如下三种格式: *file:///path/to/file
*file:/path/to/file
*data:application/x-pem-file;base64,<base64-encoded value>