Interface ReaderBuilder<T>
- All Superinterfaces:
Cloneable
ReaderBuilder
is used to configure and create instances of Reader
.- Since:
- 2.0.0
- See Also:
-
Method Summary
Modifier and TypeMethodDescriptionautoAckOldestChunkedMessageOnQueueFull
(boolean autoAckOldestChunkedMessageOnQueueFull) Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this @maxPendingChunkedMessage threshold.autoUpdatePartitions
(boolean autoUpdate) If enabled, the reader will auto subscribe for partitions increasement.autoUpdatePartitionsInterval
(int interval, TimeUnit unit) Set the interval of updating partitions (default: 1 minute).clone()
Create a copy of the currentReaderBuilder
.create()
Finalize the creation of theReader
instance.Finalize the creation of theReader
instance in asynchronous mode.Sets theConsumerCryptoFailureAction
to specify.cryptoKeyReader
(CryptoKeyReader cryptoKeyReader) Sets aCryptoKeyReader
to decrypt the message payloads.defaultCryptoKeyReader
(String privateKey) Sets the default implementation ofCryptoKeyReader
.defaultCryptoKeyReader
(Map<String, String> privateKeys) Sets the default implementation ofCryptoKeyReader
.expireTimeOfIncompleteChunkedMessage
(long duration, TimeUnit unit) If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if consumer won't be able to receive all chunks in expire times (default 1 minute).intercept
(ReaderInterceptor<T>... interceptors) InterceptReader
.keyHashRange
(Range... ranges) Set key hash range of the reader, broker will only dispatch messages which hash of the message key contains by the specified key hash range.Load the configuration from provided config map.maxPendingChunkedMessage
(int maxPendingChunkedMessage) Consumer buffers chunk messages into memory until it receives all the chunks of the original message.messageCrypto
(MessageCrypto messageCrypto) Sets aMessageCrypto
.poolMessages
(boolean poolMessages) Enable pooling of messages and the underlying data buffers.readCompacted
(boolean readCompacted) If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog of the topic.readerListener
(ReaderListener<T> readerListener) Sets aReaderListener
for the reader.readerName
(String readerName) Specify a reader name.receiverQueueSize
(int receiverQueueSize) Sets the size of the consumer receive queue.startMessageFromRollbackDuration
(long rollbackDuration, TimeUnit timeunit) The initial reader positioning can be set at specific timestamp by providing total rollback duration.startMessageId
(MessageId startMessageId) The initial reader positioning is done by specifying a message id.Set the reader to include the given position ofstartMessageId(MessageId)
subscriptionName
(String subscriptionName) Set the subscription name.subscriptionRolePrefix
(String subscriptionRolePrefix) Set the subscription role prefix.Specify the topic this reader will read from.Specify topics this reader will read from.
-
Method Details
-
create
Finalize the creation of theReader
instance.This method will block until the reader is created successfully or an exception is thrown.
- Returns:
- the reader instance
- Throws:
PulsarClientException
- if the reader creation fails
-
createAsync
CompletableFuture<Reader<T>> createAsync()Finalize the creation of theReader
instance in asynchronous mode.This method will return a
CompletableFuture
that can be used to access the instance when it's ready.- Returns:
- the reader instance
- Throws:
PulsarClientException
- if the reader creation fails
-
loadConf
Load the configuration from provided config map.Example:
Map<String, Object> config = new HashMap<>(); config.put("topicName", "test-topic"); config.put("receiverQueueSize", 2000); ReaderBuilder<byte[]> builder = ...; builder = builder.loadConf(config); Reader<byte[]> reader = builder.create();
- Parameters:
config
- configuration to load- Returns:
- the reader builder instance
-
clone
ReaderBuilder<T> clone()Create a copy of the currentReaderBuilder
.Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For example:
ReaderBuilder<String> builder = client.newReader(Schema.STRING) .readerName("my-reader") .receiverQueueSize(10); Reader<String> reader1 = builder.clone().topic("topic-1").create(); Reader<String> reader2 = builder.clone().topic("topic-2").create();
- Returns:
- a clone of the reader builder instance
-
topic
Specify the topic this reader will read from.This argument is required when constructing the reader.
- Parameters:
topicName
- the name of the topic- Returns:
- the reader builder instance
-
topics
Specify topics this reader will read from.- Parameters:
topicNames
-- Returns:
-
startMessageId
The initial reader positioning is done by specifying a message id. The options are:MessageId.earliest
: Start reading from the earliest message available in the topicMessageId.latest
: Start reading from end of the topic. The first message read will be the one published *after* the creation of the builderMessageId
: Position the reader on a particular message. The first message read will be the one immediately *after* the specified message
If the first message *after* the specified message is not the desired behaviour, use
startMessageIdInclusive()
.- Parameters:
startMessageId
- the message id where the reader will be initially positioned on- Returns:
- the reader builder instance
-
startMessageFromRollbackDuration
The initial reader positioning can be set at specific timestamp by providing total rollback duration. so, broker can find a latest message that was published before given duration.
eg: rollbackDuration in minute = 5 suggests broker to find message which was published 5 mins back and set the inital position on that messageId.- Parameters:
rollbackDuration
- duration which position should be rolled back.- Returns:
-
startMessageIdInclusive
ReaderBuilder<T> startMessageIdInclusive()Set the reader to include the given position ofstartMessageId(MessageId)
This configuration option also applies for any cursor reset operation like
Reader.seek(MessageId)
.- Returns:
- the reader builder instance
-
readerListener
Sets aReaderListener
for the reader.When a
ReaderListener
is set, application will receive messages through it. Calls toReader.readNext()
will not be allowed.- Parameters:
readerListener
- the listener object- Returns:
- the reader builder instance
-
cryptoKeyReader
Sets aCryptoKeyReader
to decrypt the message payloads.- Parameters:
cryptoKeyReader
- CryptoKeyReader object- Returns:
- the reader builder instance
-
defaultCryptoKeyReader
Sets the default implementation ofCryptoKeyReader
.Configure the key reader to be used to decrypt the message payloads.
- Parameters:
privateKey
- the private key that is always used to decrypt message payloads.- Returns:
- the reader builder instance
- Since:
- 2.8.0
-
defaultCryptoKeyReader
Sets the default implementation ofCryptoKeyReader
.Configure the key reader to be used to decrypt the message payloads.
- Parameters:
privateKeys
- the map of private key names and their URIs used to decrypt message payloads.- Returns:
- the reader builder instance
- Since:
- 2.8.0
-
cryptoFailureAction
Sets theConsumerCryptoFailureAction
to specify.- Parameters:
action
- The action to take when the decoding fails- Returns:
- the reader builder instance
-
messageCrypto
Sets aMessageCrypto
.Contains methods to encrypt/decrypt message for End to End Encryption.
- Parameters:
messageCrypto
- message Crypto Object- Returns:
- ReaderBuilder instance
-
receiverQueueSize
Sets the size of the consumer receive queue.The consumer receive queue controls how many messages can be accumulated by the
Consumer
before the application callsConsumer.receive()
. Using a higher value could potentially increase the consumer throughput at the expense of bigger memory utilization.Default value is
1000
messages and should be good for most use cases.- Parameters:
receiverQueueSize
- the new receiver queue size value- Returns:
- the reader builder instance
-
readerName
Specify a reader name.The reader name is purely informational and can used to track a particular reader in the reported stats. By default a randomly generated name is used.
- Parameters:
readerName
- the name to use for the reader- Returns:
- the reader builder instance
-
subscriptionRolePrefix
Set the subscription role prefix. The default prefix is "reader".- Parameters:
subscriptionRolePrefix
-- Returns:
- the reader builder instance
-
subscriptionName
Set the subscription name.If subscriptionRolePrefix is set at the same time, this configuration will prevail
- Parameters:
subscriptionName
-- Returns:
- the reader builder instance
-
readCompacted
If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that point, the messages will be sent as normal.readCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent topics will lead to the reader create call throwing a
PulsarClientException
.- Parameters:
readCompacted
- whether to read from the compacted topic- Returns:
- the reader builder instance
-
keyHashRange
Set key hash range of the reader, broker will only dispatch messages which hash of the message key contains by the specified key hash range. Multiple key hash ranges can be specified on a reader.Total hash range size is 65536, so the max end of the range should be less than or equal to 65535.
- Parameters:
ranges
- key hash ranges for a reader- Returns:
- the reader builder instance
-
poolMessages
Enable pooling of messages and the underlying data buffers. When pooling is enabled, the application is responsible for calling Message.release() after the handling of every received message. If “release()” is not called on a received message, there will be a memory leak. If an application attempts to use and already “released” message, it might experience undefined behavior (for example, memory corruption, deserialization error, etc.). -
autoUpdatePartitions
If enabled, the reader will auto subscribe for partitions increasement. This is only for partitioned reader.- Parameters:
autoUpdate
- whether to auto update partition increasement- Returns:
- the reader builder instance
-
autoUpdatePartitionsInterval
Set the interval of updating partitions (default: 1 minute). This only works if autoUpdatePartitions is enabled.- Parameters:
interval
- the interval of updating partitionsunit
- the time unit of the interval.- Returns:
- the reader builder instance
-
intercept
InterceptReader
.- Parameters:
interceptors
- the list of interceptors to intercept the reader created by this builder.- Returns:
- the reader builder instance
-
maxPendingChunkedMessage
Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publisher failed to publish all chunks of the messages.eg: M1-C1, M2-C1, M1-C2, M2-C2 Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.
Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this @maxPendingChunkedMessage threshold. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked. This behavior can be controlled by configuration: @autoAckOldestChunkedMessageOnQueueFull The default value is 10.- Parameters:
maxPendingChunkedMessage
-- Returns:
-
autoAckOldestChunkedMessageOnQueueFull
ReaderBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull) Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this @maxPendingChunkedMessage threshold. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.- Parameters:
autoAckOldestChunkedMessageOnQueueFull
-- Returns:
-
expireTimeOfIncompleteChunkedMessage
If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if consumer won't be able to receive all chunks in expire times (default 1 minute).- Parameters:
duration
-unit
-- Returns:
-