Messaging
Pulsar is built on the publish-subscribe pattern (often abbreviated to pub-sub). In this pattern, producers publish messages to topics; consumers subscribe to those topics, process incoming messages, and send acknowledgments to the broker when processing is finished.
When a subscription is created, Pulsar retains all messages, even if the consumer is disconnected. The retained messages are discarded only when a consumer acknowledges that all these messages are processed successfully.
If the consumption of a message fails and you want this message to be consumed again, you can enable the message redelivery mechanism to request the broker to resend this message.
Messages
Messages are the basic "unit" of Pulsar. They're what producers publish to topics and what consumers then consume from topics. The following table lists the components of messages.
Component | Description |
---|---|
Value / data payload | The data carried by the message. All Pulsar messages contain raw bytes, although message data can also conform to data schemas. |
Key | The key (string type) of the message. It is a short name of message key or partition key. Messages are optionally tagged with keys, which is useful for features like topic compaction. |
Properties | An optional key/value map of user-defined properties. |
Producer name | The name of the producer who produces the message. If you do not specify a producer name, the default name is used. |
Topic name | The name of the topic that the message is published to. |
Schema version | The version number of the schema that the message is produced with. |
Sequence ID | Each Pulsar message belongs to an ordered sequence on its topic. The sequence ID of a message is initially assigned by its producer, indicating its order in that sequence, and can also be customized. Sequence ID can be used for message deduplication. If brokerDeduplicationEnabled is set to true , the sequence ID of each message is unique within a producer of a topic (non-partitioned) or a partition. |
Message ID | The message ID of a message is assigned by bookies as soon as the message is persistently stored. Message ID indicates a message's specific position in a ledger and is unique within a Pulsar cluster. |
Publish time | The timestamp of when the message is published. The timestamp is automatically applied by the producer. |
Event time | An optional timestamp attached to a message by applications. For example, applications attach a timestamp on when the message is processed. If nothing is set to event time, the value is 0 . |
The default max size of a message is 5 MB. You can configure the max size of a message with the following configuration options.
-
In the
broker.conf
file.# The max size of a message (in bytes).
maxMessageSize=5242880 -
In the
bookkeeper.conf
file.# The max size of the netty frame (in bytes). Any messages received larger than this value are rejected. The default value is 5 MB.
nettyMaxFrameSizeBytes=5253120
For more information on Pulsar messages, see Pulsar binary protocol.
Acknowledgment
A message acknowledgment is sent by a consumer to a broker after the consumer consumes a message successfully. Then, this consumed message will be permanently stored and deleted only after all the subscriptions have acknowledged it. An acknowledgment (ack) is Pulsar's way of knowing that the message can be deleted from the system. If you want to store the messages that have been acknowledged by a consumer, you need to configure the message retention policy.
For batch messages, you can enable batch index acknowledgment to avoid dispatching acknowledged messages to the consumer. For details about batch index acknowledgment, see batching.
Messages can be acknowledged in one of the following two ways:
-
Being acknowledged individually
With individual acknowledgment, the consumer acknowledges each message and sends an acknowledgment request to the broker.
-
Being acknowledged cumulatively
With cumulative acknowledgment, the consumer only acknowledges the last message it received. All messages in the stream up to (and including) the provided message are not redelivered to that consumer.
If you want to acknowledge messages individually, you can use the following API.
consumer.acknowledge(msg);
If you want to acknowledge messages cumulatively, you can use the following API.
consumer.acknowledgeCumulative(msg);
Cumulative acknowledgment cannot be used in Shared or Key_shared subscription type, because Shared or Key_Shared subscription type involves multiple consumers which have access to the same subscription. In Shared and Key_Shared subscription types, messages should be acknowledged individually.
Negative acknowledgment
The negative acknowledgment mechanism allows you to send a notification to the broker indicating the consumer did not process a message. When a consumer fails to consume a message and needs to re-consume it, the consumer sends a negative acknowledgment (nack) to the broker, triggering the broker to redeliver this message to the consumer.
Messages are negatively acknowledged individually or cumulatively, depending on the consumption subscription type.
In Exclusive and Failover subscription types, consumers only negatively acknowledge the last message they receive.
In Shared and Key_Shared subscription types, consumers can negatively acknowledge messages individually.
Be aware that negative acknowledgments on ordered subscription types, such as Exclusive, Failover and Key_Shared, might cause failed messages being sent to consumers out of the original order.
If you are going to use negative acknowledgment on a message, make sure it is negatively acknowledged before the acknowledgment timeout.
Use the following API to negatively acknowledge message consumption.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-negative-ack")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) // the default value is 1 min
.subscribe();
Message<byte[]> message = consumer.receive();
// call the API to send negative acknowledgment
consumer.negativeAcknowledge(message);
message = consumer.receive();
consumer.acknowledge(message);
To redeliver messages with different delays, you can use the redelivery backoff mechanism by setting the number of retries to deliver the messages.
Use the following API to enable Negative Redelivery Backoff
.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-negative-ack")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(1000)
.maxDelayMs(60 * 1000)
.multiplier(2)
.build())
.subscribe();
The message redelivery behavior should be as follows.
Redelivery count | Redelivery delay |
---|---|
1 | 1 seconds |
2 | 2 seconds |
3 | 4 seconds |
4 | 8 seconds |
5 | 16 seconds |
6 | 32 seconds |
7 | 60 seconds |
8 | 60 seconds |
If batching is enabled, all messages in one batch are redelivered to the consumer.
Acknowledgment timeout
By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.
The acknowledgment timeout mechanism allows you to set a time range during which the client tracks the unacknowledged messages. After this acknowledgment timeout (ackTimeout
) period, the client sends redeliver unacknowledged messages
request to the broker, thus the broker resends the unacknowledged messages to the consumer.
You can configure the acknowledgment timeout mechanism to redeliver the message if it is not acknowledged after ackTimeout
or to execute a timer task to check the acknowledgment timeout messages during every ackTimeoutTickTime
period.
You can also use the redelivery backoff mechanism to redeliver messages with different delays by setting the number of times the messages are retried.
If you want to use redelivery backoff, you can use the following API.
consumer.ackTimeout(10, TimeUnit.SECOND)
.ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(1000)
.maxDelayMs(60 * 1000)
.multiplier(2)
.build());
The message redelivery behavior should be as follows.
Redelivery count | Redelivery delay |
---|---|
1 | 10 + 1 seconds |
2 | 10 + 2 seconds |
3 | 10 + 4 seconds |
4 | 10 + 8 seconds |
5 | 10 + 16 seconds |
6 | 10 + 32 seconds |
7 | 10 + 60 seconds |
8 | 10 + 60 seconds |
- If batching is enabled, all messages in one batch are redelivered to the consumer.
- Compared with acknowledgment timeout, negative acknowledgment is preferred. First, it is difficult to set a timeout value. Second, a broker resends messages when the message processing time exceeds the acknowledgment timeout, but these messages might not need to be re-consumed.
Use the following API to enable acknowledgment timeout.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.ackTimeout(2, TimeUnit.SECONDS) // the default value is 0
.ackTimeoutTickTime(1, TimeUnit.SECONDS)
.subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<byte[]> message = consumer.receive();
// wait at least 2 seconds
message = consumer.receive();
consumer.acknowledge(message);
Retry letter topic
Retry letter topic allows you to store the messages that failed to be consumed and retry consuming them later. With this method, you can customize the interval at which the messages are redelivered. Consumers on the original topic are automatically subscribed to the retry letter topic as well. Once the maximum number of retries has been reached, the unconsumed messages are moved to a dead letter topic for manual processing. The functionality of a retry letter topic is implemented by consumers.
The diagram below illustrates the concept of the retry letter topic.
The intention of using retry letter topic is different from using delayed message delivery, even though both are aiming to consume a message later. Retry letter topic serves failure handling through message redelivery to ensure critical data is not lost, while delayed message delivery is intended to deliver a message with a specified time delay.
By default, automatic retry is disabled. You can set enableRetry
to true
to enable automatic retry on the consumer.
Use the following API to consume messages from a retry letter topic. When the value of maxRedeliverCount
is reached, the unconsumed messages are moved to a dead letter topic.
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
The default retry letter topic uses this format:
<topicname>-<subscriptionname>-RETRY
- For Pulsar 2.6.x and 2.7.x, the default retry letter topic uses the format of
<subscriptionname>-RETRY
. If you upgrade from 2.6.x~2.7.x to 2.8.x or later, you need to delete historical retry letter topics and retry letter partitioned topics. Otherwise, Pulsar continues to use original topics, which are formatted with<subscriptionname>-RETRY
. - It is not recommended to use
<subscriptionname>-RETRY
because if multiple topics under the same namespace have the same subscription, then retry message topic names for multiple topics might be the same, which will result in mutual consumptions.
Use the Java client to specify the name of the retry letter topic.
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("my-retry-letter-topic-name")
.build())
.subscribe();
The messages in the retry letter topic contain some special properties that are automatically created by the client.
Special property | Description |
---|---|
REAL_TOPIC | The real topic name. |
ORIGIN_MESSAGE_ID | The origin message ID. It is crucial for message tracking. |
RECONSUMETIMES | The number of retries to consume messages. |
DELAY_TIME | Message retry interval in milliseconds. |
Example
REAL_TOPIC = persistent://public/default/my-topic
ORIGIN_MESSAGE_ID = 1:0:-1:0
RECONSUMETIMES = 6
DELAY_TIME = 3000
Use the following API to store the messages in a retrial queue.
consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS);
Use the following API to add custom properties for the reconsumeLater
function. In the next attempt to consume, custom properties can be get from message#getProperty.
Map<String, String> customProperties = new HashMap<String, String>();
customProperties.put("custom-key-1", "custom-value-1");
customProperties.put("custom-key-2", "custom-value-2");
consumer.reconsumeLater(msg, customProperties, 3, TimeUnit.SECONDS);
- Currently, retry letter topic is enabled in Shared subscription types.
- Compared with negative acknowledgment, retry letter topic is more suitable for messages that require a large number of retries with a configurable retry interval. Because messages in the retry letter topic are persisted to BookKeeper, while messages that need to be retried due to negative acknowledgment are cached on the client side.
Dead letter topic
Dead letter topic allows you to continue message consumption even when some messages are not consumed successfully. The messages that have failed to be consumed are stored in a specific topic, which is called the dead letter topic. The functionality of a dead letter topic is implemented by consumers. You can decide how to handle the messages in the dead letter topic.
Enable dead letter topic in a Java client using the default dead letter topic.
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
The default dead letter topic uses this format:
<topicname>-<subscriptionname>-DLQ
The dead letter producerName uses this format:
<topicname>-<subscriptionname>-<consumername>-DLQ
- For Pulsar 2.6.x and 2.7.x, the default dead letter topic uses the format of
<subscriptionname>-DLQ
. If you upgrade from 2.6.x~2.7.x to 2.8.x or later, you need to delete historical dead letter topics and retry letter partitioned topics. Otherwise, Pulsar continues to use original topics, which are formatted with<subscriptionname>-DLQ
. - It is not recommended to use
<subscriptionname>-DLQ
because if multiple topics under the same namespace have the same subscription, then dead message topic names for multiple topics might be the same, which will result in mutual consumptions.
Use the Java client to specify the name of the dead letter topic.
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("my-dead-letter-topic-name")
.build())
.subscribe();
By default, there is no subscription during DLQ topic creation. Without a just-in-time subscription to the DLQ topic, you may lose messages. To automatically create an initial subscription for the DLQ, you can specify the initialSubscriptionName
parameter. If this parameter is set but the broker's allowAutoSubscriptionCreation
is disabled, the DLQ producer will fail to be created.
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("my-dead-letter-topic-name")
.initialSubscriptionName("init-sub")
.build())
.subscribe();
Dead letter topic serves message redelivery, which is triggered by acknowledgment timeout or negative acknowledgment or retry letter topic.
Currently, dead letter topic is enabled in Shared and Key_Shared subscription types.