Skip to main content
Version: 3.3.x

Use a Java client

Create a producer

Once you've instantiated a PulsarClient object, you can create a Producer for a specific Pulsar topic.

Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();

// You can then send messages to the broker and topic you specified:
producer.send("My message".getBytes());

By default, producers produce messages that consist of byte arrays. You can produce different types by specifying a message schema.

Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
stringProducer.send("My message");

Make sure that you close your producers, consumers, and clients when you do not need them.

producer.close();
consumer.close();
client.close();

Close operations can also be asynchronous:

producer.closeAsync()
.thenRun(() -> System.out.println("Producer closed"))
.exceptionally((ex) -> {
System.err.println("Failed to close producer: " + ex);
return null;
});

Create a consumer

In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new consumer by first instantiating a PulsarClient object and passing it a URL for a Pulsar broker (as above).

Once you've instantiated a PulsarClient object, you can create a Consumer by specifying a topic and a subscription.

Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();

The subscribe method will auto-subscribe the consumer to the specified topic and subscription. One way to make the consumer listen to the topic is to set up a while loop. In this example loop, the consumer listens for messages, prints the contents of any received message, and then acknowledges that the message has been processed. If the processing logic fails, you can use negative acknowledgment to redeliver the message later.

while (true) {
// Wait for a message
Message msg = consumer.receive();

try {
// Do something with the message
System.out.println("Message received: " + new String(msg.getData()));

// Acknowledge the message
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}

If you don't want to block your main thread but constantly listen for new messages, consider using a MessageListener. The MessageListener uses a thread pool inside the client. You can set the number of threads for message listeners in the ClientBuilder. The MessageListener will use a thread pool inside the PulsarClient. You can set the number of threads to use for message listeners in the ClientBuilder.

MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
}

Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.messageListener(myMessageListener)
.subscribe();

Create a reader

With the reader interface, Pulsar clients can "manually position" themselves within a topic and read all messages from a specified message onward. The Pulsar API for Java enables you to create Reader objects by specifying a topic and a MessageId.

The following is an example.

byte[] msgIdBytes = // Some message ID byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();

while (true) {
Message message = reader.readNext();
// Process message
}

In the example above, a Reader object is instantiated for a specific topic and message (by ID); the reader iterates over each message in the topic after the message is identified by msgIdBytes (how that value is obtained depends on the application).

The code sample above shows pointing the Reader object to a specific message (by ID), but you can also use MessageId.earliest to point to the earliest available message on the topic of MessageId.latest to point to the most recent available message.