Interface ConsumerBuilder<T>
- All Superinterfaces:
Cloneable
ConsumerBuilder
is used to configure and create instances of Consumer
.- Since:
- 2.0.0
- See Also:
-
Method Summary
Modifier and TypeMethodDescriptionacknowledgmentGroupTime
(long delay, TimeUnit unit) Sets amount of time for group consumer acknowledgments.ackTimeout
(long ackTimeout, TimeUnit timeUnit) Sets the timeout for unacknowledged messages, truncated to the nearest millisecond.ackTimeoutRedeliveryBackoff
(RedeliveryBackoff ackTimeoutRedeliveryBackoff) Sets the redelivery backoff policy for messages that are redelivered due to acknowledgement timeout.ackTimeoutTickTime
(long tickTime, TimeUnit timeUnit) Define the granularity of the ack-timeout redelivery.autoAckOldestChunkedMessageOnQueueFull
(boolean autoAckOldestChunkedMessageOnQueueFull) Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this @maxPendingChunkedMessage threshold.autoScaledReceiverQueueSizeEnabled
(boolean enabled) If this is enabled, the consumer receiver queue size is initialized as a very small value, 1 by default, and will double itself until it reaches the value set byreceiverQueueSize(int)
, if and only if:autoUpdatePartitions
(boolean autoUpdate) If enabled, the consumer auto-subscribes for partition increases.autoUpdatePartitionsInterval
(int interval, TimeUnit unit) Sets the interval of updating partitions (default: 1 minute).batchReceivePolicy
(BatchReceivePolicy batchReceivePolicy) SetsBatchReceivePolicy
for the consumer.clone()
Create a copy of the current consumer builder.consumerEventListener
(ConsumerEventListener consumerEventListener) Sets aConsumerEventListener
for the consumer.consumerName
(String consumerName) Sets the consumer name.Sets the ConsumerCryptoFailureAction to the value specified.cryptoKeyReader
(CryptoKeyReader cryptoKeyReader) Sets aCryptoKeyReader
.deadLetterPolicy
(DeadLetterPolicy deadLetterPolicy) Sets dead letter policy for a consumer.defaultCryptoKeyReader
(String privateKey) Sets the default implementation ofCryptoKeyReader
.defaultCryptoKeyReader
(Map<String, String> privateKeys) Sets the default implementation ofCryptoKeyReader
.enableBatchIndexAcknowledgment
(boolean batchIndexAcknowledgmentEnabled) Enable or disable batch index acknowledgment.enableRetry
(boolean retryEnable) If enabled, the consumer auto-retries messages.expireTimeOfIncompleteChunkedMessage
(long duration, TimeUnit unit) If the producer fails to publish all the chunks of a message, then the consumer can expire incomplete chunks if the consumer doesn't receive all chunks during the expiration period (default 1 minute).intercept
(ConsumerInterceptor<T>... interceptors) InterceptConsumer
.isAckReceiptEnabled
(boolean isAckReceiptEnabled) Enables or disables the acknowledgment receipt feature.keySharedPolicy
(KeySharedPolicy keySharedPolicy) Sets KeyShared subscription policy for consumer.Load the configuration from provided config map.maxAcknowledgmentGroupSize
(int messageNum) Set the number of messages for group consumer acknowledgments.maxPendingChuckedMessage
(int maxPendingChuckedMessage) Deprecated.maxPendingChunkedMessage
(int maxPendingChunkedMessage) Consumer buffers chunk messages into memory until it receives all the chunks of the original message.maxTotalReceiverQueueSizeAcrossPartitions
(int maxTotalReceiverQueueSizeAcrossPartitions) Sets the max total receiver queue size across partitions.messageCrypto
(MessageCrypto messageCrypto) Sets aMessageCrypto
.messageListener
(MessageListener<T> messageListener) Sets aMessageListener
for the consumer.messageListenerExecutor
(MessageListenerExecutor messageListenerExecutor) Set theMessageListenerExecutor
to be used for message listeners of current consumer.messagePayloadProcessor
(MessagePayloadProcessor payloadProcessor) If configured with a non-null value, the consumer uses the processor to process the payload, including decoding it to messages and triggering the listener.negativeAckRedeliveryBackoff
(RedeliveryBackoff negativeAckRedeliveryBackoff) negativeAckRedeliveryBackoff sets the redelivery backoff policy for messages that are negatively acknowledged using `consumer.negativeAcknowledge(Messageinvalid input: '<'?> message)` but not with `consumer.negativeAcknowledge(MessageId messageId)`.negativeAckRedeliveryDelay
(long redeliveryDelay, TimeUnit timeUnit) Sets the delay to wait before re-delivering messages that have failed to be processed.patternAutoDiscoveryPeriod
(int periodInMinutes) Sets topic's auto-discovery period when using a pattern for topic's consumer.patternAutoDiscoveryPeriod
(int interval, TimeUnit unit) Sets topic's auto-discovery period when using a pattern for topic's consumer.poolMessages
(boolean poolMessages) Enable pooling of messages and the underlying data buffers.priorityLevel
(int priorityLevel) Shared subscriptionproperties
(Map<String, String> properties) Add all the properties in the provided map to the consumer.Sets a name/value property with this consumer.readCompacted
(boolean readCompacted) If enabled, the consumer reads messages from the compacted topic rather than the full message topic backlog.receiverQueueSize
(int receiverQueueSize) Sets the size of the consumer receive queue.replicateSubscriptionState
(boolean replicateSubscriptionState) Sets the consumer to include the given position of any reset operation likeConsumer.seek(long)
orConsumer.seek(MessageId)
}.startPaused
(boolean paused) Starts the consumer in a paused state.Finalize theConsumer
creation by subscribing to the topic.Finalize theConsumer
creation by subscribing to the topic in asynchronous mode.subscriptionInitialPosition
(SubscriptionInitialPosition subscriptionInitialPosition) Sets theSubscriptionInitialPosition
for the consumer.subscriptionMode
(SubscriptionMode subscriptionMode) Selects the subscription mode to be used when subscribing to a topic.subscriptionName
(String subscriptionName) Specify the subscription name for this consumer.subscriptionProperties
(Map<String, String> subscriptionProperties) Specify the subscription properties for this subscription.subscriptionTopicsMode
(RegexSubscriptionMode regexSubscriptionMode) Determines which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both.subscriptionType
(SubscriptionType subscriptionType) Select the subscription type to be used when subscribing to a topic.Specify the topics this consumer subscribes to.topicConfiguration
(String topicName) Configure topic specific options to override those set at theConsumerBuilder
level.topicConfiguration
(String topicName, Consumer<TopicConsumerBuilder<T>> builderConsumer) Configure topic specific options to override those set at theConsumerBuilder
level.topicConfiguration
(Pattern topicsPattern) Configure topic specific options to override those set at theConsumerBuilder
level.topicConfiguration
(Pattern topicsPattern, Consumer<TopicConsumerBuilder<T>> builderConsumer) Configure topic specific options to override those set at theConsumerBuilder
level.Specify a list of topics that this consumer subscribes to.topicsPattern
(String topicsPattern) Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.topicsPattern
(Pattern topicsPattern) Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.
-
Method Details
-
clone
ConsumerBuilder<T> clone()Create a copy of the current consumer builder.Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For example:
ConsumerBuilder<String> builder = client.newConsumer(Schema.STRING) .subscriptionName("my-subscription-name") .subscriptionType(SubscriptionType.Shared) .receiverQueueSize(10); Consumer<String> consumer1 = builder.clone().topic("my-topic-1").subscribe(); Consumer<String> consumer2 = builder.clone().topic("my-topic-2").subscribe();
- Returns:
- a cloned consumer builder object
-
loadConf
Load the configuration from provided config map.Example:
Map<String, Object> config = new HashMap<>(); config.put("ackTimeoutMillis", 1000); config.put("receiverQueueSize", 2000); Consumer<byte[]> builder = client.newConsumer() .loadConf(config) .subscribe(); Consumer<byte[]> consumer = builder.subscribe();
- Parameters:
config
- configuration to load- Returns:
- the consumer builder instance
-
subscribe
Finalize theConsumer
creation by subscribing to the topic.If the subscription does not exist, a new subscription is created. By default, the subscription is created at the end of the topic. See
subscriptionInitialPosition(SubscriptionInitialPosition)
to configure the initial position behavior.Once a subscription is created, it retains the data and the subscription cursor even if the consumer is not connected.
- Returns:
- the consumer builder instance
- Throws:
PulsarClientException
- if the subscribe operation fails
-
subscribeAsync
CompletableFuture<Consumer<T>> subscribeAsync()Finalize theConsumer
creation by subscribing to the topic in asynchronous mode.If the subscription does not exist, a new subscription is created. By default, the subscription is created at the end of the topic. See
subscriptionInitialPosition(SubscriptionInitialPosition)
to configure the initial position behavior.Once a subscription is created, it retains the data and the subscription cursor even if the consumer is not connected.
- Returns:
- a future that yields a
Consumer
instance - Throws:
PulsarClientException
- if the subscribe operation fails
-
topic
Specify the topics this consumer subscribes to.- Parameters:
topicNames
- a set of topics that the consumer subscribes to- Returns:
- the consumer builder instance
-
topics
Specify a list of topics that this consumer subscribes to.- Parameters:
topicNames
- a list of topics that the consumer subscribes to- Returns:
- the consumer builder instance
-
topicsPattern
Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.The pattern is applied to subscribe to all topics, within a single namespace, that match the pattern.
The consumer automatically subscribes to topics created after itself.
- Parameters:
topicsPattern
- a regular expression to select a list of topics(not contains the partition suffix) to subscribe to- Returns:
- the consumer builder instance
-
topicsPattern
Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.It accepts a regular expression that is compiled into a pattern internally. E.g., "persistent://public/default/pattern-topic-.*"
The pattern is applied to subscribe to all topics, within a single namespace, that match the pattern.
The consumer automatically subscribes to topics created after itself.
- Parameters:
topicsPattern
- given regular expression for topics(not contains the partition suffix) pattern- Returns:
- the consumer builder instance
-
subscriptionName
Specify the subscription name for this consumer.This argument is required when constructing the consumer.
- Parameters:
subscriptionName
- the name of the subscription that this consumer should attach to- Returns:
- the consumer builder instance
-
subscriptionProperties
Specify the subscription properties for this subscription. Properties are immutable, and consumers under the same subscription will fail to create a subscription if they use different properties.- Parameters:
subscriptionProperties
- the properties of the subscription- Returns:
- the consumer builder instance
-
ackTimeout
Sets the timeout for unacknowledged messages, truncated to the nearest millisecond. The timeout must be greater than 1 second.By default, the acknowledgment timeout is disabled (set to `0`, which means infinite). When a consumer with an infinite acknowledgment timeout terminates, any unacknowledged messages that it receives are re-delivered to another consumer.
When enabling acknowledgment timeout, if a message is not acknowledged within the specified timeout, it is re-delivered to the consumer (possibly to a different consumer, in the case of a shared subscription).
- Parameters:
ackTimeout
- for unacked messages.timeUnit
- unit in which the timeout is provided.- Returns:
- the consumer builder instance
-
isAckReceiptEnabled
Enables or disables the acknowledgment receipt feature.When this feature is enabled, the consumer ensures that acknowledgments are processed by the broker by waiting for a receipt from the broker. Even when the broker returns a receipt, it doesn't guarantee that the message won't be redelivered later due to certain implementation details. It is recommended to use the asynchronous
MessageAcknowledger.acknowledgeAsync(Message)
method for acknowledgment when this feature is enabled. This is because using the synchronousMessageAcknowledger.acknowledge(Message)
method with acknowledgment receipt can cause performance issues due to the round trip to the server, which prevents pipelining (having multiple messages in-flight). With the asynchronous method, the consumer can continue consuming other messages while waiting for the acknowledgment receipts.- Parameters:
isAckReceiptEnabled
-true
to enable acknowledgment receipt,false
to disable it- Returns:
- the consumer builder instance
-
ackTimeoutTickTime
Define the granularity of the ack-timeout redelivery.By default, the tick time is set to 1 second. Using a higher tick time reduces the memory overhead to track messages when the ack-timeout is set to bigger values (e.g., 1 hour).
- Parameters:
tickTime
- the min precision for the acknowledgment timeout messages trackertimeUnit
- unit in which the timeout is provided.- Returns:
- the consumer builder instance
-
negativeAckRedeliveryDelay
Sets the delay to wait before re-delivering messages that have failed to be processed.When application uses
Consumer.negativeAcknowledge(Message)
, the failed message is redelivered after a fixed timeout. The default is 1 min.- Parameters:
redeliveryDelay
- redelivery delay for failed messagestimeUnit
- unit in which the timeout is provided.- Returns:
- the consumer builder instance
- See Also:
-
subscriptionType
Select the subscription type to be used when subscribing to a topic.Options are:
- Parameters:
subscriptionType
- the subscription type value- Returns:
- the consumer builder instance
-
subscriptionMode
Selects the subscription mode to be used when subscribing to a topic.Options are:
- Parameters:
subscriptionMode
- the subscription mode value- Returns:
- the consumer builder instance
-
messageListener
Sets aMessageListener
for the consumer.The application receives messages through the message listener, and calls to
Consumer.receive()
are not allowed.- Parameters:
messageListener
- the listener object- Returns:
- the consumer builder instance
-
messageListenerExecutor
Set theMessageListenerExecutor
to be used for message listeners of current consumer. (default: use executor from PulsarClient,invalid reference
org.apache.pulsar.client.impl.PulsarClientImpl#externalExecutorProvider
The listener thread pool is exclusively owned by current consumer that are using a "listener" model to get messages. For a given internal consumer, the listener will always be invoked from the same thread, to ensure ordering.
The caller need to shut down the thread pool after closing the consumer to avoid leaks.
- Parameters:
messageListenerExecutor
- the executor of the consumer message listener- Returns:
- the consumer builder instance
-
cryptoKeyReader
Sets aCryptoKeyReader
.Configure the key reader to be used to decrypt message payloads.
- Parameters:
cryptoKeyReader
- CryptoKeyReader object- Returns:
- the consumer builder instance
-
defaultCryptoKeyReader
Sets the default implementation ofCryptoKeyReader
.Configure the key reader to be used to decrypt message payloads.
- Parameters:
privateKey
- the private key that is always used to decrypt message payloads.- Returns:
- the consumer builder instance
- Since:
- 2.8.0
-
defaultCryptoKeyReader
Sets the default implementation ofCryptoKeyReader
.Configure the key reader to be used to decrypt the message payloads.
- Parameters:
privateKeys
- the map of private key names and their URIs used to decrypt message payloads.- Returns:
- the consumer builder instance
- Since:
- 2.8.0
-
messageCrypto
Sets aMessageCrypto
.Contains methods to encrypt/decrypt messages for end-to-end encryption.
- Parameters:
messageCrypto
- MessageCrypto object- Returns:
- the consumer builder instance
-
cryptoFailureAction
Sets the ConsumerCryptoFailureAction to the value specified.- Parameters:
action
- the action the consumer takes in case of decryption failures- Returns:
- the consumer builder instance
-
receiverQueueSize
Sets the size of the consumer receive queue.The consumer receive queue controls how many messages can be accumulated by the
Consumer
before the application callsConsumer.receive()
. Using a higher value can potentially increase consumer throughput at the expense of bigger memory utilization.For the consumer that subscribes to the partitioned topic, the parameter
maxTotalReceiverQueueSizeAcrossPartitions(int)
also affects the number of messages accumulated in the consumer.Setting the consumer queue size as zero
- Decreases the throughput of the consumer by disabling pre-fetching of messages. This approach improves the
message distribution on shared subscriptions by pushing messages only to the consumers that are ready to process
them. Neither
Consumer.receive(int, TimeUnit)
nor Partitioned Topics can be used if the consumer queue size is zero.Consumer.receive()
function call should not be interrupted when the consumer queue size is zero. - Doesn't support Batch-Message. If a consumer receives a batch-message, it closes the consumer connection with
the broker and
Consumer.receive()
calls remain blocked whileConsumer.receiveAsync()
receives exception in callback. The consumer is not able to receive any further messages unless batch-message in pipeline is removed.
1000
messages and should be adequate for most use cases.- Parameters:
receiverQueueSize
- the new receiver queue size value- Returns:
- the consumer builder instance
- Decreases the throughput of the consumer by disabling pre-fetching of messages. This approach improves the
message distribution on shared subscriptions by pushing messages only to the consumers that are ready to process
them. Neither
-
acknowledgmentGroupTime
Sets amount of time for group consumer acknowledgments.By default, the consumer uses a 100 ms grouping time to send out acknowledgments to the broker.
Setting a group time of 0 sends out acknowledgments immediately. A longer acknowledgment group time is more efficient, but at the expense of a slight increase in message re-deliveries after a failure.
- Parameters:
delay
- the max amount of time an acknowledgement can be delayedunit
- the time unit for the delay- Returns:
- the consumer builder instance
-
maxAcknowledgmentGroupSize
Set the number of messages for group consumer acknowledgments.By default, the consumer uses at most 1000 messages to send out acknowledgments to the broker.
- Parameters:
messageNum
-- Returns:
- the consumer builder instance
-
replicateSubscriptionState
- Parameters:
replicateSubscriptionState
-
-
maxTotalReceiverQueueSizeAcrossPartitions
ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) Sets the max total receiver queue size across partitions.This setting is used to reduce the receiver queue size for individual partitions
receiverQueueSize(int)
if the total exceeds this value (default: 50000). The purpose of this setting is to have an upper-limit on the number of messages that a consumer can be pushed at once from a broker, across all the partitions.This setting is applicable only to consumers subscribing to partitioned topics. In such cases, there will be multiple queues for each partition and a single queue for the parent consumer. This setting controls the queues of all partitions, not the parent queue. For instance, if a consumer subscribes to a single partitioned topic, the total number of messages accumulated in this consumer will be the sum of
receiverQueueSize(int)
and maxTotalReceiverQueueSizeAcrossPartitions.- Parameters:
maxTotalReceiverQueueSizeAcrossPartitions
- max pending messages across all the partitions- Returns:
- the consumer builder instance
-
consumerName
Sets the consumer name.Consumer names are informative, and can be used to identify a particular consumer instance from the topic stats.
- Parameters:
consumerName
-- Returns:
- the consumer builder instance
-
consumerEventListener
Sets aConsumerEventListener
for the consumer.The consumer group listener is used for receiving consumer state changes in a consumer group for failover subscriptions. The application can then react to the consumer state changes.
- Parameters:
consumerEventListener
- the consumer group listener object- Returns:
- the consumer builder instance
-
readCompacted
If enabled, the consumer reads messages from the compacted topic rather than the full message topic backlog. This means that, if the topic has been compacted, the consumer will only see the latest value for each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that point, the messages are sent as normal.readCompacted can only be enabled on subscriptions to persistent topics with a single active consumer (i.e. failover or exclusive subscriptions). Enabling readCompacted on subscriptions to non-persistent topics or on shared subscriptions will cause the subscription call to throw a PulsarClientException.
- Parameters:
readCompacted
- whether to read from the compacted topic or full message topic backlog- Returns:
- the consumer builder instance
-
patternAutoDiscoveryPeriod
Sets topic's auto-discovery period when using a pattern for topic's consumer. The period is in minutes, and the default and minimum values are 1 minute.- Parameters:
periodInMinutes
- number of minutes between checks for new topics matching pattern set withtopicsPattern(String)
- Returns:
- the consumer builder instance
-
patternAutoDiscoveryPeriod
Sets topic's auto-discovery period when using a pattern for topic's consumer. The default value of period is 1 minute, with a minimum of 1 second.- Parameters:
interval
- the amount of delay between checks for new topics matching pattern set withtopicsPattern(String)
unit
- the unit of the topics auto discovery period- Returns:
- the consumer builder instance
-
priorityLevel
Shared subscriptionSets priority level for shared subscription consumers to determine which consumers the broker prioritizes when dispatching messages. Here, the broker follows descending priorities. (eg: 0=max-priority, 1, 2,..)
In Shared subscription mode, the broker first dispatches messages to max priority-level consumers if they have permits, otherwise the broker considers next priority level consumers.
If a subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1, then the broker dispatches messages to only consumer-A until it is drained, and then the broker will start dispatching messages to Consumer-B.
Consumer PriorityLevel Permits C1 0 2 C2 0 1 C3 0 1 C4 1 2 C5 1 1 Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
Failover subscription for partitioned topic The broker selects the active consumer for a failover subscription for a partitioned topic based on consumer's priority-level and lexicographical sorting of consumer name. eg:
1. Active consumer = C1 : Same priority-level and lexicographical sorting Consumer PriorityLevel Name C1 0 aaa C2 0 bbb 2. Active consumer = C2 : Consumer with highest priority Consumer PriorityLevel Name C1 1 aaa C2 0 bbb Partitioned-topics: Broker evenly assigns partitioned topics to highest priority consumers.
Priority level has no effect on failover subscriptions for non-partitioned topics.
- Parameters:
priorityLevel
- the priority of this consumer- Returns:
- the consumer builder instance
-
property
Sets a name/value property with this consumer.Properties are application-defined metadata that can be attached to the consumer. When getting topic stats, this metadata is associated with the consumer stats for easier identification.
- Parameters:
key
- the property keyvalue
- the property value- Returns:
- the consumer builder instance
-
properties
Add all the properties in the provided map to the consumer.Properties are application-defined metadata that can be attached to the consumer. When getting topic stats, this metadata is associated with the consumer stats for easier identification.
- Parameters:
properties
- the map with properties- Returns:
- the consumer builder instance
-
subscriptionInitialPosition
ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) Sets theSubscriptionInitialPosition
for the consumer.- Parameters:
subscriptionInitialPosition
- the position where to initialize a newly created subscription- Returns:
- the consumer builder instance
-
subscriptionTopicsMode
Determines which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions.- Parameters:
regexSubscriptionMode
- Pattern subscription mode
-
intercept
InterceptConsumer
.- Parameters:
interceptors
- the list of interceptors to intercept the consumer created by this builder.
-
deadLetterPolicy
Sets dead letter policy for a consumer.By default, messages are redelivered as many times as possible until they are acknowledged. If you enable a dead letter mechanism, messages will have a maxRedeliverCount. When a message exceeds the maximum number of redeliveries, the message is sent to the Dead Letter Topic and acknowledged automatically.
Enable the dead letter mechanism by setting dead letter policy. example:
client.newConsumer() .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build()) .subscribe();
Default dead letter topic name is {TopicName}-{Subscription}-DLQ. To set a custom dead letter topic name:client.newConsumer() .deadLetterPolicy(DeadLetterPolicy .builder() .maxRedeliverCount(10) .deadLetterTopic("your-topic-name") .build()) .subscribe();
-
autoUpdatePartitions
If enabled, the consumer auto-subscribes for partition increases. This is only for partitioned consumers.- Parameters:
autoUpdate
- whether to auto-update partition increases
-
autoUpdatePartitionsInterval
Sets the interval of updating partitions (default: 1 minute). This only works if autoUpdatePartitions is enabled.- Parameters:
interval
- the interval of updating partitionsunit
- the time unit of the interval.- Returns:
- the consumer builder instance
-
startMessageIdInclusive
ConsumerBuilder<T> startMessageIdInclusive()Sets the consumer to include the given position of any reset operation likeConsumer.seek(long)
orConsumer.seek(MessageId)
}.- Returns:
- the consumer builder instance
-
batchReceivePolicy
SetsBatchReceivePolicy
for the consumer. By default, consumer usesBatchReceivePolicy.DEFAULT_POLICY
as batch receive policy.Example:
client.newConsumer().batchReceivePolicy(BatchReceivePolicy.builder() .maxNumMessages(100) .maxNumBytes(5 * 1024 * 1024) .timeout(100, TimeUnit.MILLISECONDS) .build()).subscribe();
-
enableRetry
If enabled, the consumer auto-retries messages. Default: disabled.- Parameters:
retryEnable
- whether to auto retry message
-
enableBatchIndexAcknowledgment
Enable or disable batch index acknowledgment. To enable this feature, ensure batch index acknowledgment is enabled on the broker side. -
maxPendingChuckedMessage
Deprecated.Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publisher failed to publish all chunks of the messages.eg: M1-C1, M2-C1, M1-C2, M2-C2 Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.
Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this @maxPendingChuckedMessage threshold. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked. This behavior can be controlled by configuration: @autoAckOldestChunkedMessageOnQueueFull The default value is 10.- Parameters:
maxPendingChuckedMessage
-- Returns:
-
maxPendingChunkedMessage
Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publisher failed to publish all chunks of the messages.eg: M1-C1, M2-C1, M1-C2, M2-C2 Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.
Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this @maxPendingChunkedMessage threshold. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked. This behavior can be controlled by configuration: @autoAckOldestChunkedMessageOnQueueFull The default value is 10.- Parameters:
maxPendingChunkedMessage
-- Returns:
-
autoAckOldestChunkedMessageOnQueueFull
ConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull) Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this @maxPendingChunkedMessage threshold. Once the consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acknowledging if autoAckOldestChunkedMessageOnQueueFull is true, otherwise it marks them for redelivery.- Parameters:
autoAckOldestChunkedMessageOnQueueFull
-- Returns:
-
expireTimeOfIncompleteChunkedMessage
If the producer fails to publish all the chunks of a message, then the consumer can expire incomplete chunks if the consumer doesn't receive all chunks during the expiration period (default 1 minute).- Parameters:
duration
-unit
-- Returns:
-
poolMessages
Enable pooling of messages and the underlying data buffers. When pooling is enabled, the application is responsible for calling Message.release() after the handling of every received message. If “release()” is not called on a received message, it causes a memory leak. If an application attempts to use an already “released” message, it might experience undefined behavior (eg: memory corruption, deserialization error, etc.). -
messagePayloadProcessor
If configured with a non-null value, the consumer uses the processor to process the payload, including decoding it to messages and triggering the listener. Default: null -
negativeAckRedeliveryBackoff
negativeAckRedeliveryBackoff sets the redelivery backoff policy for messages that are negatively acknowledged using `consumer.negativeAcknowledge(Messageinvalid input: '<'?> message)` but not with `consumer.negativeAcknowledge(MessageId messageId)`. This setting allows specifying a backoff policy for messages that are negatively acknowledged, enabling more flexible control over the delay before such messages are redelivered.This configuration accepts a
RedeliveryBackoff
object that defines the backoff policy. The policy can be either a fixed delay or an exponential backoff. An exponential backoff policy is beneficial in scenarios where increasing the delay between consecutive redeliveries can help mitigate issues like temporary resource constraints or processing bottlenecks.Note: This backoff policy does not apply when using `consumer.negativeAcknowledge(MessageId messageId)` because the redelivery count cannot be determined from just the message ID. It is recommended to use `consumer.negativeAcknowledge(Messageinvalid input: '<'?> message)` if you want to leverage the redelivery backoff policy.
Example usage:
client.newConsumer() .negativeAckRedeliveryBackoff(ExponentialRedeliveryBackoff.builder() .minDelayMs(1000) // Set minimum delay to 1 second .maxDelayMs(60000) // Set maximum delay to 60 seconds .build()) .subscribe();
- Parameters:
negativeAckRedeliveryBackoff
- the backoff policy to use for negatively acknowledged messages- Returns:
- the consumer builder instance
-
ackTimeoutRedeliveryBackoff
Sets the redelivery backoff policy for messages that are redelivered due to acknowledgement timeout. This setting allows you to specify a backoff policy for messages that are not acknowledged within the specified ack timeout. By using a backoff policy, you can control the delay before a message is redelivered, potentially improving consumer performance by avoiding immediate redelivery of messages that might still be processing.This method accepts a
RedeliveryBackoff
object that defines the backoff policy to be used. You can use either a fixed backoff policy or an exponential backoff policy. The exponential backoff policy is particularly useful for scenarios where it may be beneficial to progressively increase the delay between redeliveries, reducing the load on the consumer and giving more time to process messages.Example usage:
client.newConsumer() .ackTimeout(10, TimeUnit.SECONDS) .ackTimeoutRedeliveryBackoff(ExponentialRedeliveryBackoff.builder() .minDelayMs(1000) // Set minimum delay to 1 second .maxDelayMs(60000) // Set maximum delay to 60 seconds .build()) .subscribe();
Note: This configuration is effective only if the ack timeout is triggered. It does not apply to messages negatively acknowledged using the negative acknowledgment API.
- Parameters:
ackTimeoutRedeliveryBackoff
- the backoff policy to use for messages that exceed their ack timeout- Returns:
- the consumer builder instance
-
startPaused
Starts the consumer in a paused state. When enabled, the consumer does not immediately fetch messages whensubscribe()
is called. Instead, the consumer waits to fetch messages untilConsumer.resume()
is called. See alsoConsumer.pause()
. -
autoScaledReceiverQueueSizeEnabled
If this is enabled, the consumer receiver queue size is initialized as a very small value, 1 by default, and will double itself until it reaches the value set byreceiverQueueSize(int)
, if and only if:1) User calls receive() and there are no messages in receiver queue.
2) The last message we put in the receiver queue took the last space available in receiver queue. This is disabled by default and currentReceiverQueueSize is initialized as maxReceiverQueueSize. The feature should be able to reduce client memory usage.
- Parameters:
enabled
- whether to enable AutoScaledReceiverQueueSize.
-
topicConfiguration
Configure topic specific options to override those set at theConsumerBuilder
level.- Parameters:
topicName
- a topic name- Returns:
- a
TopicConsumerBuilder
instance
-
topicConfiguration
ConsumerBuilder<T> topicConfiguration(String topicName, Consumer<TopicConsumerBuilder<T>> builderConsumer) Configure topic specific options to override those set at theConsumerBuilder
level.- Parameters:
topicName
- a topic namebuilderConsumer
- a consumer to allow the configuration of theTopicConsumerBuilder
instance
-
topicConfiguration
Configure topic specific options to override those set at theConsumerBuilder
level.- Parameters:
topicsPattern
- a regular expression to match a topic name- Returns:
- a
TopicConsumerBuilder
instance
-
topicConfiguration
ConsumerBuilder<T> topicConfiguration(Pattern topicsPattern, Consumer<TopicConsumerBuilder<T>> builderConsumer) Configure topic specific options to override those set at theConsumerBuilder
level.- Parameters:
topicsPattern
- a regular expression to match a topic namebuilderConsumer
- a consumer to allow the configuration of theTopicConsumerBuilder
instance
-
maxPendingChunkedMessage(int)