Skip to main content

Work with reader

After setting up your clients, you can explore more to start working with readers.

Receive and read messages​

A reader is just a consumer without a cursor. This means that Pulsar does not keep track of your progress and there is no need to acknowledge messages.

Here's an example that begins reading from the earliest available message on a topic.

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;

// Create a reader on a topic and for a specific message (and onward)
Reader<byte[]> reader = pulsarClient.newReader()
.topic("reader-api-test")
.startMessageId(MessageId.earliest)
.create();

while (true) {
Message message = reader.readNext();

// Process the message
}

To create a reader that reads from the latest available message:

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.latest)
.create();

Read specific messages​

To create a reader that reads from some message between the earliest and the latest:

byte[] msgIdBytes = // Some byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();

Configure chunking​

Configuring chunking for readers is similar to that for consumers. See configure chunking for consumers for more information.

The following is an example of how to configure message chunking for a reader.

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topicName)
.startMessageId(MessageId.earliest)
.maxPendingChunkedMessage(12)
.autoAckOldestChunkedMessageOnQueueFull(true)
.expireTimeOfIncompleteChunkedMessage(12, TimeUnit.MILLISECONDS)
.create();

Intercept messages​

Pulsar reader interceptor intercepts and possibly mutates messages with user-defined processing before Pulsar reader reads them. With reader interceptors, you can apply unified messaging processes before messages can be read, such as modifying messages, adding properties, collecting statistics and etc, without creating similar mechanisms respectively.

Reader interceptor

Pulsar reader interceptor works on top of Pulsar consumer interceptor. The plugin interface ReaderInterceptor can be treated as a subset of ConsumerInterceptor and it has two main events.

  • beforeRead is triggered before readers read messages. You can modify messages within this event.
  • onPartitionsChange is triggered when changes on partitions have been detected.

To perceive triggered events and perform customized processing, you can add ReaderInterceptor when creating a Reader as follows.

PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Reader<byte[]> reader = pulsarClient.newReader()
.topic("t1")
.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
.intercept(new ReaderInterceptor<byte[]>() {
@Override
public void close() {
}

@Override
public Message<byte[]> beforeRead(Reader<byte[]> reader, Message<byte[]> message) {
// user-defined processing logic
return message;
}

@Override
public void onPartitionsChange(String topicName, int partitions) {
// user-defined processing logic
}
})
.startMessageId(MessageId.earliest)
.create();

Sticky key range reader​

In a sticky key range reader, broker only dispatches messages which hash of the message key contains by the specified key hash range. Multiple key hash ranges can be specified on a reader.

The following is an example to create a sticky key range reader.

pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
.create();

The total hash range size is 65536, so the max end of the range should be less than or equal to 65535.