public interface ConsumerBuilder<T>
extends Cloneable
ConsumerBuilder
is used to configure and create instances of Consumer
.PulsarClient.newConsumer()
Modifier and Type | Method and Description |
---|---|
ConsumerBuilder<T> |
acknowledgmentGroupTime(long delay,
TimeUnit unit)
Group the consumer acknowledgments for the specified time.
|
ConsumerBuilder<T> |
ackTimeout(long ackTimeout,
TimeUnit timeUnit)
Set the timeout for unacked messages, truncated to the nearest millisecond.
|
ConsumerBuilder<T> |
clone()
Create a copy of the current consumer builder.
|
ConsumerBuilder<T> |
consumerEventListener(ConsumerEventListener consumerEventListener)
Sets a
ConsumerEventListener for the consumer. |
ConsumerBuilder<T> |
consumerName(String consumerName)
Set the consumer name.
|
ConsumerBuilder<T> |
cryptoFailureAction(ConsumerCryptoFailureAction action)
Sets the ConsumerCryptoFailureAction to the value specified
|
ConsumerBuilder<T> |
cryptoKeyReader(CryptoKeyReader cryptoKeyReader)
Sets a
CryptoKeyReader |
ConsumerBuilder<T> |
deadLetterPolicy(DeadLetterPolicy deadLetterPolicy)
Set dead letter policy for consumer
By default some message will redelivery so many times possible, even to the extent that it can be never stop.
|
ConsumerBuilder<T> |
intercept(ConsumerInterceptor<T>... interceptors)
Intercept
Consumer . |
ConsumerBuilder<T> |
loadConf(Map<String,Object> config)
Load the configuration from provided config map.
|
ConsumerBuilder<T> |
maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions)
Set the max total receiver queue size across partitons.
|
ConsumerBuilder<T> |
messageListener(MessageListener<T> messageListener)
Sets a
MessageListener for the consumer |
ConsumerBuilder<T> |
patternAutoDiscoveryPeriod(int periodInMinutes)
Set topics auto discovery period when using a pattern for topics consumer.
|
ConsumerBuilder<T> |
priorityLevel(int priorityLevel)
Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
messages.
|
ConsumerBuilder<T> |
properties(Map<String,String> properties)
Add all the properties in the provided map
|
ConsumerBuilder<T> |
property(String key,
String value)
Set a name/value property with this consumer.
|
ConsumerBuilder<T> |
readCompacted(boolean readCompacted)
If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
of the topic.
|
ConsumerBuilder<T> |
receiverQueueSize(int receiverQueueSize)
Sets the size of the consumer receive queue.
|
Consumer<T> |
subscribe()
Finalize the
Consumer creation by subscribing to the topic. |
CompletableFuture<Consumer<T>> |
subscribeAsync()
Finalize the
Consumer creation by subscribing to the topic in asynchronous mode. |
ConsumerBuilder<T> |
subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition)
Set subscriptionInitialPosition for the consumer
|
ConsumerBuilder<T> |
subscriptionName(String subscriptionName)
Specify the subscription name for this consumer.
|
ConsumerBuilder<T> |
subscriptionTopicsMode(Mode mode)
Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both.
|
ConsumerBuilder<T> |
subscriptionType(SubscriptionType subscriptionType)
Select the subscription type to be used when subscribing to the topic.
|
ConsumerBuilder<T> |
topic(String... topicNames)
Specify the topics this consumer will subscribe on.
|
ConsumerBuilder<T> |
topics(List<String> topicNames)
Specify a list of topics that this consumer will subscribe on.
|
ConsumerBuilder<T> |
topicsPattern(java.util.regex.Pattern topicsPattern)
Specify a pattern for topics that this consumer will subscribe on.
|
ConsumerBuilder<T> |
topicsPattern(String topicsPattern)
Specify a pattern for topics that this consumer will subscribe on.
|
ConsumerBuilder<T> clone()
Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For example:
ConsumerBuilder builder = client.newConsumer() // .subscriptionName("my-subscription-name") // .subscriptionType(SubscriptionType.Shared) // .receiverQueueSize(10); Consumer consumer1 = builder.clone().topic(TOPIC_1).subscribe(); Consumer consumer2 = builder.clone().topic(TOPIC_2).subscribe();
ConsumerBuilder<T> loadConf(Map<String,Object> config)
Example:
Map<String, Object> config = new HashMap<>(); config.put("ackTimeoutMillis", 1000); config.put("receiverQueueSize", 2000); ConsumerBuilder<byte[]> builder = ...; builder = builder.loadConf(config); Consumer<byte[]> consumer = builder.subscribe();
config
- configuration to loadConsumer<T> subscribe() throws PulsarClientException
Consumer
creation by subscribing to the topic.
If the subscription does not exist, a new subscription will be created and all messages published after the creation will be retained until acknowledged, even if the consumer is not connected.
Consumer
instancePulsarClientException
- if the the subscribe operation failsCompletableFuture<Consumer<T>> subscribeAsync()
Consumer
creation by subscribing to the topic in asynchronous mode.
If the subscription does not exist, a new subscription will be created and all messages published after the creation will be retained until acknowledged, even if the consumer is not connected.
Consumer
instancePulsarClientException
- if the the subscribe operation failsConsumerBuilder<T> topic(String... topicNames)
topicNames
- ConsumerBuilder<T> topics(List<String> topicNames)
topicNames
- ConsumerBuilder<T> topicsPattern(java.util.regex.Pattern topicsPattern)
topicsPattern
- ConsumerBuilder<T> topicsPattern(String topicsPattern)
topicsPattern
- given regular expression for topics patternConsumerBuilder<T> subscriptionName(String subscriptionName)
This argument is required when constructing the consumer.
subscriptionName
- ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit)
ackTimeout
- for unacked messages.timeUnit
- unit in which the timeout is provided.ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType)
Default is SubscriptionType.Exclusive
subscriptionType
- the subscription type valueConsumerBuilder<T> messageListener(MessageListener<T> messageListener)
MessageListener
for the consumer
When a MessageListener
is set, application will receive messages through it. Calls to
Consumer.receive()
will not be allowed.
messageListener
- the listener objectConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader)
CryptoKeyReader
cryptoKeyReader
- CryptoKeyReader objectConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action)
action
- The consumer actionConsumerBuilder<T> receiverQueueSize(int receiverQueueSize)
The consumer receive queue controls how many messages can be accumulated by the Consumer
before the
application calls Consumer.receive()
. Using a higher value could potentially increase the consumer
throughput at the expense of bigger memory utilization.
Setting the consumer queue size as zero
Consumer.receive(int, TimeUnit)
nor Partitioned Topics can be used if the consumer queue
size is zero. Consumer.receive()
function call should not be interrupted when the consumer queue size is
zero.Consumer.receive()
call will remain blocked while Consumer.receiveAsync()
receives
exception in callback. consumer will not be able receive any further message unless batch-message in pipeline
is removed1000
messages and should be good for most use cases.receiverQueueSize
- the new receiver queue size valueConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit)
By default, the consumer will use a 100 ms grouping time to send out the acknowledgments to the broker.
Setting a group time of 0, will send out the acknowledgments immediately.
delay
- the max amount of time an acknowledgemnt can be delayedunit
- the time unit for the delayConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions)
This setting will be used to reduce the receiver queue size for individual partitions
receiverQueueSize(int)
if the total exceeds this value (default: 50000).
maxTotalReceiverQueueSizeAcrossPartitions
- ConsumerBuilder<T> consumerName(String consumerName)
consumerName
- ConsumerBuilder<T> consumerEventListener(ConsumerEventListener consumerEventListener)
ConsumerEventListener
for the consumer.
The consumer group listener is used for receiving consumer state change in a consumer group for failover subscription. Application can then react to the consumer state changes.
consumerEventListener
- the consumer group listener objectConsumerBuilder<T> readCompacted(boolean readCompacted)
readCompacted
- whether to read from the compacted topicConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes)
periodInMinutes
- number of minutes between checks for
new topics matching pattern set with topicsPattern(String)
ConsumerBuilder<T> priorityLevel(int priorityLevel)
Consumer PriorityLevel Permits C1 0 2 C2 0 1 C3 0 1 C4 1 2 C5 1 1 Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
priorityLevel
- ConsumerBuilder<T> property(String key, String value)
key
- value
- ConsumerBuilder<T> properties(Map<String,String> properties)
properties
- ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition)
ConsumerBuilder<T> subscriptionTopicsMode(Mode mode)
mode
- Pattern subscription modeConsumerBuilder<T> intercept(ConsumerInterceptor<T>... interceptors)
Consumer
.interceptors
- the list of interceptors to intercept the consumer created by this builder.ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy)
client.newConsumer() .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build()) .subscribe();Default dead letter topic name is {TopicName}-{Subscription}-DLQ. To setting a custom dead letter topic name
client.newConsumer() .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("your-topic-name").build()) .subscribe();When a dead letter policy is specified, and no ackTimeoutMillis is specified, then the ack timeout will be set to 30000 millisecond