Skip to main content

Work with consumer

After setting up your clients, you can explore more to start working with consumers.

Subscribe to topics

Pulsar has various subscription types to match different scenarios. A topic can have multiple subscriptions with different subscription types. However, a subscription can only have one subscription type at a time.

A subscription is identical to the subscription name; a subscription name can specify only one subscription type at a time. To change the subscription type, you should first stop all consumers of this subscription.

Different subscription types have different message distribution types. This section describes the differences between subscription types and how to use them.

To better describe their differences, assume you have a topic named "my-topic", and the producer has published 10 messages.

Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(false)
.create();
// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();

Exclusive

Create a new consumer and subscribe with the Exclusive subscription type.

Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe()

Only the first consumer is allowed to the subscription, and other consumers receive an error. The first consumer receives all 10 messages, and the consuming order is the same as the producing order.

note

If the topic is partitioned, the first consumer subscribes to all partitioned topics, and other consumers are not assigned with partitions and receive an error.

Failover

Create new consumers and subscribe with the Failover subscription type.

Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
//conumser1 is the active consumer, consumer2 is the standby consumer.
//consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.

Multiple consumers can attach to the same subscription, yet only the first consumer is active, and others are standby. When the active consumer is disconnected, messages will be dispatched to one of standby consumers, and the standby consumer then becomes the active consumer.

If the first active consumer is disconnected after receiving 5 messages, the standby consumer becomes active consumer. Consumer1 will receive:

("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-2", "message-2-1")
("key-2", "message-2-2")

consumer2 will receive:

("key-2", "message-2-3")
("key-3", "message-3-1")
("key-3", "message-3-2")
("key-4", "message-4-1")
("key-4", "message-4-2")
note

If a topic is a partitioned topic, each partition has only one active consumer, messages of one partition are distributed to only one consumer, and messages of multiple partitions are distributed to multiple consumers.

Shared

Create new consumers and subscribe with Shared subscription type.

Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe()

Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe()
//Both consumer1 and consumer2 are active consumers.

In Shared subscription type, multiple consumers can attach to the same subscription and messages are delivered in a round-robin distribution across consumers.

If a broker dispatches only one message at a time, consumer1 receives the following information.

("key-1", "message-1-1")
("key-1", "message-1-3")
("key-2", "message-2-2")
("key-3", "message-3-1")
("key-4", "message-4-1")

consumer2 receives the following information.

("key-1", "message-1-2")
("key-2", "message-2-1")
("key-2", "message-2-3")
("key-3", "message-3-2")
("key-4", "message-4-2")

The Shared subscription is different from the Exclusive and Failover subscription types. Shared subscription has better flexibility, but cannot provide an ordering guarantee.

Key_shared

This is a new subscription type since 2.4.0 release. Create new consumers and subscribe with Key_Shared subscription type.

Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()

Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
//Both consumer1 and consumer2 are active consumers.

Just like in the Shared subscription, all consumers in the Key_Shared subscription type can attach to the same subscription. But the Key_Shared subscription type is different from the Shared subscription. In the Key_Shared subscription type, messages with the same key are delivered to only one consumer in order. The possible distribution of messages between different consumers (by default we do not know in advance which keys will be assigned to a consumer, but a key will only be assigned to a consumer at the same time).

consumer1 receives the following information.

("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-3", "message-3-1")
("key-3", "message-3-2")

consumer2 receives the following information.

("key-2", "message-2-1")
("key-2", "message-2-2")
("key-2", "message-2-3")
("key-4", "message-4-1")
("key-4", "message-4-2")

If batching is enabled at the producer side, messages with different keys are added to a batch by default. The broker will dispatch the batch to the consumer, so the default batch mechanism may break the Key_Shared subscription guaranteed message distribution semantics. The producer needs to use the KeyBasedBatcher.

Producer producer = client.newProducer()
.topic("my-topic")
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();

Or the producer can disable batching.

Producer producer = client.newProducer()
.topic("my-topic")
.enableBatching(false)
.create();
note

If the message key is not specified, messages without keys are dispatched to one consumer in order by default.

Subscribe to multi-topics

In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously using multi-topic subscriptions. To use multi-topic subscriptions you can supply either a regular expression (regex) or a List of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.

The followings are some examples.

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);

// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();

// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();

In the above example, the consumer subscribes to the persistent topics that can match the topic name pattern. If you want the consumer subscribes to all persistent and non-persistent topics that can match the topic name pattern, set subscriptionTopicsMode to RegexSubscriptionMode.AllTopics.

Pattern pattern = Pattern.compile("public/default/.*");
pulsarClient.newConsumer()
.subscriptionName("my-sub")
.topicsPattern(pattern)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe();
note

By default, the subscriptionTopicsMode of the consumer is PersistentOnly. Available options of subscriptionTopicsMode are PersistentOnly, NonPersistentOnly, and AllTopics.

You can also subscribe to an explicit list of topics (across namespaces if you wish):

List<String> topics = Arrays.asList(
"topic-1",
"topic-2",
"topic-3"
);

Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();

// Alternatively:
Consumer multiTopicConsumer = consumerBuilder
.topic(
"topic-1",
"topic-2",
"topic-3"
)
.subscribe();

You can also subscribe to multiple topics asynchronously using the subscribeAsync method rather than the synchronous subscribe method. The following is an example.

Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
consumerBuilder
.topics(topics)
.subscribeAsync()
.thenAccept(this::receiveMessageFromConsumer);

private void receiveMessageFromConsumer(Object consumer) {
((Consumer)consumer).receiveAsync().thenAccept(message -> {
// Do something with the received message
receiveMessageFromConsumer(consumer);
});
}

Unsubscribe from topics

This example shows how a consumer unsubscribes from a topic.

consumer.unsubscribe();
note

A consumer cannot be used and is disposed once the consumer unsubscribes from a topic.

Receive messages

This example shows how a consumer receives messages from a topic.

Message message = consumer.receive();

Receive messages with timeout

consumer.receive(10, TimeUnit.SECONDS);

Async receive messages

The receive method receives messages synchronously (the consumer process is blocked until a message is available). You can also use async receive, which returns a CompletableFuture object immediately once a new message is available.

The following is an example.

CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

Async receive operations return a Message wrapped inside of a CompletableFuture.

Batch receive messages

Use batchReceive to receive multiple messages for each call.

The following is an example.

Messages messages = consumer.batchReceive();
for (Object message : messages) {
// do something
}
consumer.acknowledge(messages)
note

Batch receive policy limits the number and bytes of messages in a single batch. You can specify a timeout to wait for enough messages. The batch receive is completed if any of the following conditions are met: enough number of messages, bytes of messages, wait timeout.

Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumMessages(100)
.maxNumBytes(1024 * 1024)
.timeout(200, TimeUnit.MILLISECONDS)
.build())
.subscribe();

The default batch receive policy is:

BatchReceivePolicy.builder()
.maxNumMessage(-1)
.maxNumBytes(10 * 1024 * 1024)
.timeout(100, TimeUnit.MILLISECONDS)
.build();

Acknowledge messages

Messages can be acknowledged individually or cumulatively. For details about message acknowledgment, see acknowledgment.

Acknowledge messages individually

consumer.acknowledge(msg);

Acknowledge messages cumulatively

consumer.acknowledgeCumulative(msg);

Negative acknowledgment redelivery backoff

The RedeliveryBackoff introduces a redelivery backoff mechanism. You can achieve redelivery with different delays by setting the redelivery count of messages.

Consumer consumer =  client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(1000)
.maxDelayMs(60 * 1000)
.build())
.subscribe();

Acknowledgment timeout redelivery backoff

The RedeliveryBackoff introduces a redelivery backoff mechanism. You can redeliver messages with different delays by setting the number of times the messages are retried.

Consumer consumer =  client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECOND)
.ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(1000)
.maxDelayMs(60000)
.multiplier(2)
.build())
.subscribe();

The message redelivery behavior should be as follows.

Redelivery countRedelivery delay
110 + 1 seconds
210 + 2 seconds
310 + 4 seconds
410 + 8 seconds
510 + 16 seconds
610 + 32 seconds
710 + 60 seconds
810 + 60 seconds
note
  • The negativeAckRedeliveryBackoff does not work with consumer.negativeAcknowledge(MessageId messageId) because you are not able to get the redelivery count from the message ID.
  • If a consumer crashes, it triggers the redelivery of unacked messages. In this case, RedeliveryBackoff does not take effect and the messages might get redelivered earlier than the delay time from the backoff.

Configure chunking

You can limit the maximum number of chunked messages a consumer maintains concurrently by configuring specific parameters. When the configured threshold is reached, the consumer drops pending messages by silently acknowledging them or asking the broker to redeliver them later.

The following is an example of how to configure message chunking.

Consumer<byte[]> consumer = client.newConsumer()
.topic(topic)
.subscriptionName("test")
.autoAckOldestChunkedMessageOnQueueFull(true)
.maxPendingChunkedMessage(100)
.expireTimeOfIncompleteChunkedMessage(10, TimeUnit.MINUTES)
.subscribe();

Create a consumer with a message listener

You can avoid running a loop by blocking calls with an event-based style by using a message listener which is invoked for each message that is received.

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.subscriptionName("my-subscription")
.messageListener((c, m) -> {
try {
c.acknowledge(m);
} catch (Exception e) {
Assert.fail("Failed to acknowledge", e);
}
})
.subscribe();

Intercept messages

ConsumerInterceptors intercept and possibly mutate messages received by the consumer.

The interface has six main events:

  • beforeConsume is triggered before the message is returned by receive() or receiveAsync(). You can modify messages within this event.
  • onAcknowledge is triggered before the consumer sends the acknowledgement to the broker.
  • onAcknowledgeCumulative is triggered before the consumer sends the cumulative acknowledgement to the broker.
  • onNegativeAcksSend is triggered when a redelivery from a negative acknowledgement occurs.
  • onAckTimeoutSend is triggered when a redelivery from an acknowledgement timeout occurs.
  • onPartitionsChange is triggered when the partitions of the (partitioned) topic change.

To intercept messages, you can add one or multiple ConsumerInterceptors when creating a Consumer as follows.

Consumer<String> consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.intercept(new ConsumerInterceptor<String> {
@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
// user-defined processing logic
}

@Override
public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
// user-defined processing logic
}

@Override
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
// user-defined processing logic
}

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
// user-defined processing logic
}

@Override
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
// user-defined processing logic
}

@Override
public void onPartitionsChange(String topicName, int partitions) {
// user-defined processing logic
}
})
.subscribe();
note

If you are using multiple interceptors, they apply in the order they are passed to the intercept method.