Skip to main content

Java client (V5)

The V5 Java client is a client SDK built for scalable topics. It offers three purpose-built consumer types and a modern, sync-first API, and it also works against existing partitioned and non-partitioned topics -- so you can adopt it before migrating any topic.

For how it compares to the current Java client, see Java client SDKs. For the messaging model and a deeper API walkthrough, see Messaging.

note

The V5 client requires Java 17. Scalable-topic support in the other language SDKs is planned; today the V5 client is available in Java.

Install

The V5 client is published to Maven Central as pulsar-client-v5 (available from 4.2.2). The API lives under the org.apache.pulsar.client.api.v5 package.

Maven

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-v5</artifactId>
<version>4.2.2</version>
</dependency>

Gradle

dependencies {
implementation "org.apache.pulsar:pulsar-client-v5:4.2.2"
}

Create a client

Build one PulsarClient and share it across producers and consumers; close it on shutdown.

import org.apache.pulsar.client.api.v5.PulsarClient;

PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();

// ... create producers and consumers ...

client.close();

The service URL uses the pulsar:// (or pulsar+ssl://) scheme. Authentication, TLS, operation timeouts, and memory limits are set on the same builder.

Produce messages

Create a producer with a schema and a topic, then build and send messages:

Producer<String> producer = client.newProducer(Schema.string())
.topic("topic://public/default/orders")
.create();

producer.newMessage()
.key("user-123")
.value("order placed")
.send();

Send asynchronously through async():

var async = producer.async();
async.newMessage().value("order placed").send()
.thenAccept(id -> System.out.println("sent " + id));
async.flush().join();

Consume messages

The V5 client replaces the four classic subscription types with three consumer types. Choose one by how you intend to consume; see Consumers for the full semantics.

Stream consumer

Ordered consumption with cumulative acknowledgment:

StreamConsumer<String> consumer = client.newStreamConsumer(Schema.string())
.topic("topic://public/default/orders")
.subscriptionName("my-sub")
.subscribe();

Message<String> msg = consumer.receive(Duration.ofSeconds(5)); // null on timeout
if (msg != null) {
process(msg.value());
consumer.acknowledgeCumulative(msg.id());
}

Queue consumer

Parallel consumption with individual acknowledgment, negative acknowledgment, and dead-letter support:

QueueConsumer<String> consumer = client.newQueueConsumer(Schema.string())
.topic("topic://public/default/orders")
.subscriptionName("workers")
.subscribe();

Message<String> msg = consumer.receive(Duration.ofSeconds(5));
if (msg != null) {
try {
process(msg.value());
consumer.acknowledge(msg.id());
} catch (Exception e) {
consumer.negativeAcknowledge(msg.id());
}
}

Checkpoint consumer

For stream-processing engines that track their own position -- no subscription and no acknowledgment. Capture a serializable Checkpoint and restore from it:

CheckpointConsumer<String> consumer = client.newCheckpointConsumer(Schema.string())
.topic("topic://public/default/orders")
.startPosition(Checkpoint.earliest()) // earliest(), latest(), or a saved checkpoint
.create();

Message<String> msg = consumer.receive(Duration.ofSeconds(5));
byte[] state = consumer.checkpoint().toByteArray(); // persist externally
// resume later with: .startPosition(Checkpoint.fromByteArray(state))

Schemas

Pass a schema when creating any producer or consumer. The V5 schema factories are lowercase methods:

Schema.string() // String
Schema.json(Order.class) // JSON-encoded POJO
Schema.avro(Order.class) // Avro-encoded POJO

Primitive factories (Schema.int32(), Schema.bool(), Schema.bytes(), …) and Schema.protobuf(...) are also available.

Transactions

Produce messages and acknowledge consumed messages atomically. Bind a produce with .transaction(txn) on the message builder and an acknowledgment with the two-argument acknowledge:

Transaction txn = client.newTransaction();
try {
producer.newMessage().transaction(txn).value(result).send();
consumer.acknowledge(msg.id(), txn);
txn.commit();
} catch (Exception e) {
txn.abort();
}

What's next