Interface ReaderBuilder<T>

All Superinterfaces:
Cloneable

@Public @Stable public interface ReaderBuilder<T> extends Cloneable
ReaderBuilder is used to configure and create instances of Reader.
Since:
2.0.0
See Also:
  • Method Details

    • create

      Reader<T> create() throws PulsarClientException
      Finalize the creation of the Reader instance.

      This method will block until the reader is created successfully or an exception is thrown.

      Returns:
      the reader instance
      Throws:
      PulsarClientException - if the reader creation fails
    • createAsync

      CompletableFuture<Reader<T>> createAsync()
      Finalize the creation of the Reader instance in asynchronous mode.

      This method will return a CompletableFuture that can be used to access the instance when it's ready.

      Returns:
      the reader instance
      Throws:
      PulsarClientException - if the reader creation fails
    • loadConf

      ReaderBuilder<T> loadConf(Map<String,Object> config)
      Load the configuration from provided config map.

      Example:

      
       Map<String, Object> config = new HashMap<>();
       config.put("topicName", "test-topic");
       config.put("receiverQueueSize", 2000);
      
       ReaderBuilder<byte[]> builder = ...;
       builder = builder.loadConf(config);
      
       Reader<byte[]> reader = builder.create();
       
      Parameters:
      config - configuration to load
      Returns:
      the reader builder instance
    • clone

      ReaderBuilder<T> clone()
      Create a copy of the current ReaderBuilder.

      Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For example:

      
       ReaderBuilder<String> builder = client.newReader(Schema.STRING)
                   .readerName("my-reader")
                   .receiverQueueSize(10);
      
       Reader<String> reader1 = builder.clone().topic("topic-1").create();
       Reader<String> reader2 = builder.clone().topic("topic-2").create();
       
      Returns:
      a clone of the reader builder instance
    • topic

      ReaderBuilder<T> topic(String topicName)
      Specify the topic this reader will read from.

      This argument is required when constructing the reader.

      Parameters:
      topicName - the name of the topic
      Returns:
      the reader builder instance
    • topics

      ReaderBuilder<T> topics(List<String> topicNames)
      Specify topics this reader will read from.
      Parameters:
      topicNames -
      Returns:
    • startMessageId

      ReaderBuilder<T> startMessageId(MessageId startMessageId)
      The initial reader positioning is done by specifying a message id. The options are:
      • MessageId.earliest: Start reading from the earliest message available in the topic
      • MessageId.latest: Start reading from end of the topic. The first message read will be the one published *after* the creation of the builder
      • MessageId: Position the reader on a particular message. The first message read will be the one immediately *after* the specified message

      If the first message *after* the specified message is not the desired behaviour, use startMessageIdInclusive().

      Parameters:
      startMessageId - the message id where the reader will be initially positioned on
      Returns:
      the reader builder instance
    • startMessageFromRollbackDuration

      ReaderBuilder<T> startMessageFromRollbackDuration(long rollbackDuration, TimeUnit timeunit)
      The initial reader positioning can be set at specific timestamp by providing total rollback duration. so, broker can find a latest message that was published before given duration.
      eg: rollbackDuration in minute = 5 suggests broker to find message which was published 5 mins back and set the inital position on that messageId.
      Parameters:
      rollbackDuration - duration which position should be rolled back.
      Returns:
    • startMessageIdInclusive

      ReaderBuilder<T> startMessageIdInclusive()
      Set the reader to include the given position of startMessageId(MessageId)

      This configuration option also applies for any cursor reset operation like Reader.seek(MessageId).

      Returns:
      the reader builder instance
    • readerListener

      ReaderBuilder<T> readerListener(ReaderListener<T> readerListener)
      Sets a ReaderListener for the reader.

      When a ReaderListener is set, application will receive messages through it. Calls to Reader.readNext() will not be allowed.

      Parameters:
      readerListener - the listener object
      Returns:
      the reader builder instance
    • cryptoKeyReader

      ReaderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader)
      Sets a CryptoKeyReader to decrypt the message payloads.
      Parameters:
      cryptoKeyReader - CryptoKeyReader object
      Returns:
      the reader builder instance
    • defaultCryptoKeyReader

      ReaderBuilder<T> defaultCryptoKeyReader(String privateKey)
      Sets the default implementation of CryptoKeyReader.

      Configure the key reader to be used to decrypt the message payloads.

      Parameters:
      privateKey - the private key that is always used to decrypt message payloads.
      Returns:
      the reader builder instance
      Since:
      2.8.0
    • defaultCryptoKeyReader

      ReaderBuilder<T> defaultCryptoKeyReader(Map<String,String> privateKeys)
      Sets the default implementation of CryptoKeyReader.

      Configure the key reader to be used to decrypt the message payloads.

      Parameters:
      privateKeys - the map of private key names and their URIs used to decrypt message payloads.
      Returns:
      the reader builder instance
      Since:
      2.8.0
    • cryptoFailureAction

      ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action)
      Sets the ConsumerCryptoFailureAction to specify.
      Parameters:
      action - The action to take when the decoding fails
      Returns:
      the reader builder instance
    • messageCrypto

      ReaderBuilder<T> messageCrypto(MessageCrypto messageCrypto)
      Sets a MessageCrypto.

      Contains methods to encrypt/decrypt message for End to End Encryption.

      Parameters:
      messageCrypto - message Crypto Object
      Returns:
      ReaderBuilder instance
    • receiverQueueSize

      ReaderBuilder<T> receiverQueueSize(int receiverQueueSize)
      Sets the size of the consumer receive queue.

      The consumer receive queue controls how many messages can be accumulated by the Consumer before the application calls Consumer.receive(). Using a higher value could potentially increase the consumer throughput at the expense of bigger memory utilization.

      Default value is 1000 messages and should be good for most use cases.

      Parameters:
      receiverQueueSize - the new receiver queue size value
      Returns:
      the reader builder instance
    • readerName

      ReaderBuilder<T> readerName(String readerName)
      Specify a reader name.

      The reader name is purely informational and can used to track a particular reader in the reported stats. By default a randomly generated name is used.

      Parameters:
      readerName - the name to use for the reader
      Returns:
      the reader builder instance
    • subscriptionRolePrefix

      ReaderBuilder<T> subscriptionRolePrefix(String subscriptionRolePrefix)
      Set the subscription role prefix. The default prefix is "reader".
      Parameters:
      subscriptionRolePrefix -
      Returns:
      the reader builder instance
    • subscriptionName

      ReaderBuilder<T> subscriptionName(String subscriptionName)
      Set the subscription name.

      If subscriptionRolePrefix is set at the same time, this configuration will prevail

      Parameters:
      subscriptionName -
      Returns:
      the reader builder instance
    • readCompacted

      ReaderBuilder<T> readCompacted(boolean readCompacted)
      If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that point, the messages will be sent as normal.

      readCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent topics will lead to the reader create call throwing a PulsarClientException.

      Parameters:
      readCompacted - whether to read from the compacted topic
      Returns:
      the reader builder instance
    • keyHashRange

      ReaderBuilder<T> keyHashRange(Range... ranges)
      Set key hash range of the reader, broker will only dispatch messages which hash of the message key contains by the specified key hash range. Multiple key hash ranges can be specified on a reader.

      Total hash range size is 65536, so the max end of the range should be less than or equal to 65535.

      Parameters:
      ranges - key hash ranges for a reader
      Returns:
      the reader builder instance
    • poolMessages

      ReaderBuilder<T> poolMessages(boolean poolMessages)
      Enable pooling of messages and the underlying data buffers.

      When pooling is enabled, the application is responsible for calling Message.release() after the handling of every received message. If “release()” is not called on a received message, there will be a memory leak. If an application attempts to use and already “released” message, it might experience undefined behavior (for example, memory corruption, deserialization error, etc.).

    • autoUpdatePartitions

      ReaderBuilder<T> autoUpdatePartitions(boolean autoUpdate)
      If enabled, the reader will auto subscribe for partitions increasement. This is only for partitioned reader.
      Parameters:
      autoUpdate - whether to auto update partition increasement
      Returns:
      the reader builder instance
    • autoUpdatePartitionsInterval

      ReaderBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit)
      Set the interval of updating partitions (default: 1 minute). This only works if autoUpdatePartitions is enabled.
      Parameters:
      interval - the interval of updating partitions
      unit - the time unit of the interval.
      Returns:
      the reader builder instance
    • intercept

      ReaderBuilder<T> intercept(ReaderInterceptor<T>... interceptors)
      Intercept Reader.
      Parameters:
      interceptors - the list of interceptors to intercept the reader created by this builder.
      Returns:
      the reader builder instance
    • maxPendingChunkedMessage

      ReaderBuilder<T> maxPendingChunkedMessage(int maxPendingChunkedMessage)
      Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publisher failed to publish all chunks of the messages.
       eg: M1-C1, M2-C1, M1-C2, M2-C2
       Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.
       
      Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this @maxPendingChunkedMessage threshold. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked. This behavior can be controlled by configuration: @autoAckOldestChunkedMessageOnQueueFull The default value is 10.
      Parameters:
      maxPendingChunkedMessage -
      Returns:
    • autoAckOldestChunkedMessageOnQueueFull

      ReaderBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull)
      Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this @maxPendingChunkedMessage threshold. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
      Parameters:
      autoAckOldestChunkedMessageOnQueueFull -
      Returns:
    • expireTimeOfIncompleteChunkedMessage

      ReaderBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit)
      If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if consumer won't be able to receive all chunks in expire times (default 1 minute).
      Parameters:
      duration -
      unit -
      Returns: