Interface ProducerBuilder<T>

All Superinterfaces:
Cloneable

@Public @Stable public interface ProducerBuilder<T> extends Cloneable
ProducerBuilder is used to configure and create instances of Producer.
See Also:
  • Method Details

    • create

      Producer<T> create() throws PulsarClientException
      Finalize the creation of the Producer instance.

      This method will block until the producer is created successfully.

      Returns:
      the producer instance
      Throws:
      PulsarClientException.ProducerBusyException - if a producer with the same "producer name" is already connected to the topic
      PulsarClientException - if the producer creation fails
    • createAsync

      CompletableFuture<Producer<T>> createAsync()
      Finalize the creation of the Producer instance in asynchronous mode.

      This method will return a CompletableFuture that can be used to access the instance when it's ready.

      Returns:
      a future that will yield the created producer instance
      Throws:
      PulsarClientException.ProducerBusyException - if a producer with the same "producer name" is already connected to the topic
      PulsarClientException - if the producer creation fails
    • loadConf

      ProducerBuilder<T> loadConf(Map<String,Object> config)
      Load the configuration from provided config map.

      Example:

      
       Map<String, Object> config = new HashMap<>();
       config.put("producerName", "test-producer");
       config.put("sendTimeoutMs", 2000);
      
       ProducerBuilder<byte[]> builder = client.newProducer()
                        .loadConf(config);
      
       Producer<byte[]> producer = builder.create();
       
      Parameters:
      config - configuration map to load
      Returns:
      the producer builder instance
    • clone

      ProducerBuilder<T> clone()
      Create a copy of the current ProducerBuilder.

      Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For example:

      
       ProducerBuilder<String> builder = client.newProducer(Schema.STRING)
                        .sendTimeout(10, TimeUnit.SECONDS)
                        .blockIfQueueFull(true);
      
       Producer<String> producer1 = builder.clone().topic("topic-1").create();
       Producer<String> producer2 = builder.clone().topic("topic-2").create();
       
      Returns:
      a clone of the producer builder instance
    • topic

      ProducerBuilder<T> topic(String topicName)
      Specify the topic this producer will be publishing on.

      This argument is required when constructing the produce.

      Parameters:
      topicName - the name of the topic
      Returns:
      the producer builder instance
    • producerName

      ProducerBuilder<T> producerName(String producerName)
      Specify a name for the producer.

      If not assigned, the system will generate a globally unique name which can be accessed with Producer.getProducerName().

      Warning: When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on a topic.

      Parameters:
      producerName - the custom name to use for the producer
      Returns:
      the producer builder instance
    • accessMode

      ProducerBuilder<T> accessMode(ProducerAccessMode accessMode)
      Configure the type of access mode that the producer requires on the topic.

      Possible values are:

      Parameters:
      accessMode - The type of access to the topic that the producer requires
      Returns:
      the producer builder instance
      See Also:
    • sendTimeout

      ProducerBuilder<T> sendTimeout(int sendTimeout, TimeUnit unit)
      Set the send timeout (default: 30 seconds).

      If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.

      Setting the timeout to zero, for example setTimeout(0, TimeUnit.SECONDS) will set the timeout to infinity, which can be useful when using Pulsar's message deduplication feature, since the client library will retry forever to publish a message. No errors will be propagated back to the application.

      Parameters:
      sendTimeout - the send timeout
      unit - the time unit of the sendTimeout
      Returns:
      the producer builder instance
    • maxPendingMessages

      ProducerBuilder<T> maxPendingMessages(int maxPendingMessages)
      Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.

      When the queue is full, by default, all calls to Producer.send(T) and Producer.sendAsync(T) will fail unless blockIfQueueFull=true. Use blockIfQueueFull(boolean) to change the blocking behavior.

      The producer queue size also determines the max amount of memory that will be required by the client application. Until, the producer gets a successful acknowledgment back from the broker, it will keep in memory (direct memory pool) all the messages in the pending queue.

      Default is 0, disable the pending messages check.

      Parameters:
      maxPendingMessages - the max size of the pending messages queue for the producer
      Returns:
      the producer builder instance
    • maxPendingMessagesAcrossPartitions

      @Deprecated ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions)
      Deprecated.
      Set the number of max pending messages across all the partitions.

      This setting will be used to lower the max pending messages for each partition (maxPendingMessages(int)), if the total exceeds the configured value. The purpose of this setting is to have an upper-limit on the number of pending messages when publishing on a partitioned topic.

      Default is 0, disable the pending messages across partitions check.

      If publishing at high rate over a topic with many partitions (especially when publishing messages without a partitioning key), it might be beneficial to increase this parameter to allow for more pipelining within the individual partitions producers.

      Parameters:
      maxPendingMessagesAcrossPartitions - max pending messages across all the partitions
      Returns:
      the producer builder instance
    • blockIfQueueFull

      ProducerBuilder<T> blockIfQueueFull(boolean blockIfQueueFull)
      Set whether the Producer.send(T) and Producer.sendAsync(T) operations should block when the outgoing message queue is full.

      Default is false. If set to false, send operations will immediately fail with PulsarClientException.ProducerQueueIsFullError when there is no space left in pending queue. If set to true, the Producer.sendAsync(T) operation will instead block.

      Setting blockIfQueueFull=true simplifies the task of an application that just wants to publish messages as fast as possible, without having to worry about overflowing the producer send queue.

      For example:

      
       Producer<String> producer = client.newProducer()
                        .topic("my-topic")
                        .blockIfQueueFull(true)
                        .create();
      
       while (true) {
           producer.sendAsync("my-message")
                .thenAccept(messageId -> {
                    System.out.println("Published message: " + messageId);
                })
                .exceptionally(ex -> {
                    System.err.println("Failed to publish: " + e);
                    return null;
                });
       }
       
      Parameters:
      blockIfQueueFull - whether to block Producer.send(T) and Producer.sendAsync(T) operations on queue full
      Returns:
      the producer builder instance
    • messageRoutingMode

      ProducerBuilder<T> messageRoutingMode(MessageRoutingMode messageRoutingMode)
      Set the MessageRoutingMode for a partitioned producer.

      Default routing mode is to round-robin across the available partitions.

      This logic is applied when the application is not setting a key on a particular message. If the key is set with MessageBuilder#setKey(String), then the hash of the key will be used to select a partition for the message.

      Parameters:
      messageRoutingMode - the message routing mode
      Returns:
      the producer builder instance
      See Also:
    • hashingScheme

      ProducerBuilder<T> hashingScheme(HashingScheme hashingScheme)
      Change the HashingScheme used to chose the partition on where to publish a particular message.

      Standard hashing functions available are:

      Parameters:
      hashingScheme - the chosen HashingScheme
      Returns:
      the producer builder instance
    • compressionType

      ProducerBuilder<T> compressionType(CompressionType compressionType)
      Set the compression type for the producer.

      By default, message payloads are not compressed. Supported compression types are:

      Parameters:
      compressionType - the selected compression type
      Returns:
      the producer builder instance
    • messageRouter

      ProducerBuilder<T> messageRouter(MessageRouter messageRouter)
      Set a custom message routing policy by passing an implementation of MessageRouter.
      Parameters:
      messageRouter -
      Returns:
      the producer builder instance
    • enableBatching

      ProducerBuilder<T> enableBatching(boolean enableBatching)
      Control whether automatic batching of messages is enabled for the producer. default: enabled

      When batching is enabled, multiple calls to Producer.sendAsync(T) can result in a single batch to be sent to the broker, leading to better throughput, especially when publishing small messages. If compression is enabled, messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or contents.

      When enabled default batch delay is set to 1 ms and default batch size is 1000 messages

      Batching is enabled by default since 2.0.0.

      Returns:
      the producer builder instance
      See Also:
    • enableChunking

      ProducerBuilder<T> enableChunking(boolean enableChunking)
      If message size is higher than allowed max publish-payload size by broker then enableChunking helps producer to split message into multiple chunks and publish them to broker separately and in order. So, it allows client to successfully publish large size of messages in pulsar.

      This feature allows publisher to publish large size of message by splitting it to multiple chunks and let consumer stitch them together to form a original large published message. Therefore, it's necessary to configure recommended configuration at pulsar producer and consumer. Recommendation to use this feature:

       1. This feature is right now only supported by non-shared subscription and persistent-topic.
       2. Disable batching to use chunking feature
       3. Pulsar-client keeps published messages into buffer until it receives ack from broker.
       So, it's better to reduce "maxPendingMessages" size to avoid producer occupying large amount
        of memory by buffered messages.
       4. Set message-ttl on the namespace to cleanup incomplete chunked messages.
       (sometime due to broker-restart or publish time, producer might fail to publish entire large message
       so, consumer will not be able to consume and ack those messages. So, those messages can
       be only discared by msg ttl) Or configure
       ConsumerBuilder.expireTimeOfIncompleteChunkedMessage(long, java.util.concurrent.TimeUnit)
       5. Consumer configuration: consumer should also configure receiverQueueSize and maxPendingChunkedMessage
       
      Parameters:
      enableChunking -
      Returns:
    • chunkMaxMessageSize

      ProducerBuilder<T> chunkMaxMessageSize(int chunkMaxMessageSize)
      Max chunk message size in bytes. Producer chunks the message if chunking is enabled and message size is larger than max chunk-message size. By default chunkMaxMessageSize value is -1 and in that case, producer chunks based on max-message size configured at broker.
      Parameters:
      chunkMaxMessageSize -
      Returns:
    • cryptoKeyReader

      ProducerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader)
      Sets a CryptoKeyReader.

      Configure the key reader to be used to encrypt the message payloads.

      Parameters:
      cryptoKeyReader - CryptoKeyReader object
      Returns:
      the producer builder instance
    • defaultCryptoKeyReader

      ProducerBuilder<T> defaultCryptoKeyReader(String publicKey)
      Sets the default implementation of CryptoKeyReader.

      Configure the key reader to be used to encrypt the message payloads.

      Parameters:
      publicKey - the public key that is always used to encrypt message payloads.
      Returns:
      the producer builder instance
      Since:
      2.8.0
    • defaultCryptoKeyReader

      ProducerBuilder<T> defaultCryptoKeyReader(Map<String,String> publicKeys)
      Sets the default implementation of CryptoKeyReader.

      Configure the key reader to be used to encrypt the message payloads.

      Parameters:
      publicKeys - the map of public key names and their URIs used to encrypt message payloads.
      Returns:
      the producer builder instance
      Since:
      2.8.0
    • addEncryptionKey

      ProducerBuilder<T> addEncryptionKey(String key)
      Add public encryption key, used by producer to encrypt the data key.

      At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are found, a callback CryptoKeyReader.getPrivateKey(String, Map) and CryptoKeyReader.getPublicKey(String, Map) is invoked against each key to load the values of the key. Application should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted after compression. If batch messaging is enabled, the batched message is encrypted.

      Parameters:
      key - the name of the encryption key in the key store
      Returns:
      the producer builder instance
    • cryptoFailureAction

      ProducerBuilder<T> cryptoFailureAction(ProducerCryptoFailureAction action)
      Sets the ProducerCryptoFailureAction to the value specified.
      Parameters:
      action - the action the producer will take in case of encryption failures
      Returns:
      the producer builder instance
    • batchingMaxPublishDelay

      ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit)
      Set the time period within which the messages sent will be batched default: 1 ms if batch messages are enabled. If set to a non zero value, messages will be queued until either:
      • this time interval expires
      • the max number of messages in a batch is reached (batchingMaxMessages(int))
      • the max size of batch is reached

      All messages will be published as a single batch message. The consumer will be delivered individual messages in the batch in the same order they were enqueued.

      Parameters:
      batchDelay - the batch delay
      timeUnit - the time unit of the batchDelay
      Returns:
      the producer builder instance
      See Also:
    • roundRobinRouterBatchingPartitionSwitchFrequency

      ProducerBuilder<T> roundRobinRouterBatchingPartitionSwitchFrequency(int frequency)
      Set the partition switch frequency while batching of messages is enabled and using round-robin routing mode for non-keyed message default: 10.

      The time period of partition switch is frequency * batchingMaxPublishDelay. During this period, all messages arrives will be route to the same partition.

      Parameters:
      frequency - the frequency of partition switch
      Returns:
      the producer builder instance
      See Also:
    • batchingMaxMessages

      ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch)
      Set the maximum number of messages permitted in a batch. default: 1000 If set to a value greater than 1, messages will be queued until this threshold is reached or batch interval has elapsed.

      All messages in batch will be published as a single batch message. The consumer will be delivered individual messages in the batch in the same order they were enqueued.

      Parameters:
      batchMessagesMaxMessagesPerBatch - maximum number of messages in a batch
      Returns:
      the producer builder instance
      See Also:
    • batchingMaxBytes

      ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes)
      Set the maximum number of bytes permitted in a batch. default: 128KB If set to a value greater than 0, messages will be queued until this threshold is reached or other batching conditions are met.

      All messages in a batch will be published as a single batched message. The consumer will be delivered individual messages in the batch in the same order they were enqueued.

      Parameters:
      batchingMaxBytes - maximum number of bytes in a batch
      Returns:
      the producer builder instance
      See Also:
    • batcherBuilder

      ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder)
      Set the batcher builder BatcherBuilder of the producer. Producer will use the batcher builder to build a batch message container.This is only be used when batching is enabled.
      Parameters:
      batcherBuilder - batcher builder
      Returns:
      the producer builder instance
    • initialSequenceId

      ProducerBuilder<T> initialSequenceId(long initialSequenceId)
      Set the baseline for the sequence ids for messages published by the producer.

      First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned incremental sequence ids, if not otherwise specified.

      Parameters:
      initialSequenceId - the initial sequence id for the producer
      Returns:
      the producer builder instance
    • property

      ProducerBuilder<T> property(String key, String value)
      Set a name/value property with this producer.

      Properties are application defined metadata that can be attached to the producer. When getting the topic stats, this metadata will be associated to the producer stats for easier identification.

      Parameters:
      key - the property key
      value - the property value
      Returns:
      the producer builder instance
    • properties

      ProducerBuilder<T> properties(Map<String,String> properties)
      Add all the properties in the provided map to the producer.

      Properties are application defined metadata that can be attached to the producer. When getting the topic stats, this metadata will be associated to the producer stats for easier identification.

      Parameters:
      properties - the map of properties
      Returns:
      the producer builder instance
    • intercept

      @Deprecated ProducerBuilder<T> intercept(ProducerInterceptor<T>... interceptors)
      Deprecated.
      Add a set of ProducerInterceptor to the producer.

      Interceptors can be used to trace the publish and acknowledgments operation happening in a producer.

      Parameters:
      interceptors - the list of interceptors to intercept the producer created by this builder.
      Returns:
      the producer builder instance
    • intercept

      ProducerBuilder<T> intercept(ProducerInterceptor... interceptors)
      Add a set of ProducerInterceptor to the producer.

      Interceptors can be used to trace the publish and acknowledgments operation happening in a producer.

      Parameters:
      interceptors - the list of interceptors to intercept the producer created by this builder.
      Returns:
      the producer builder instance
    • autoUpdatePartitions

      ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate)
      If enabled, partitioned producer will automatically discover new partitions at runtime. This is only applied on partitioned topics.

      Default is true.

      Parameters:
      autoUpdate - whether to auto discover the partition configuration changes
      Returns:
      the producer builder instance
    • autoUpdatePartitionsInterval

      ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit)
      Set the interval of updating partitions (default: 1 minute). This only works if autoUpdatePartitions is enabled.
      Parameters:
      interval - the interval of updating partitions
      unit - the time unit of the interval.
      Returns:
      the producer builder instance
    • enableMultiSchema

      ProducerBuilder<T> enableMultiSchema(boolean multiSchema)
      Control whether enable the multiple schema mode for producer. If enabled, producer can send a message with different schema from that specified just when it is created, otherwise a invalid message exception would be threw if the producer want to send a message with different schema.

      Enabled by default.

      Parameters:
      multiSchema - indicates to enable or disable multiple schema mode
      Returns:
      the producer builder instance
      Since:
      2.5.0
    • enableLazyStartPartitionedProducers

      ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean lazyStartPartitionedProducers)
      This config affects Shared mode producers of partitioned topics only. It controls whether producers register and connect immediately to the owner broker of each partition or start lazily on demand. The internal producer of one partition is always started eagerly, chosen by the routing policy, but the internal producers of any additional partitions are started on demand, upon receiving their first message. Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when the SinglePartition or some custom partial partition routing policy like PartialRoundRobinMessageRouterImpl is used without keyed messages. Because producer connection can be on demand, this can produce extra send latency for the first messages of a given partition.
      Parameters:
      lazyStartPartitionedProducers - true/false as to whether to start partition producers lazily
      Returns:
      the producer builder instance