Interface ProducerBuilder<T>

All Superinterfaces:
Cloneable

@Public @Stable public interface ProducerBuilder<T> extends Cloneable
ProducerBuilder is used to configure and create instances of Producer.
See Also:
  • Method Details

    • create

      Producer<T> create() throws PulsarClientException
      Finalize the creation of the Producer instance.

      This method will block until the producer is created successfully.

      Returns:
      the producer instance
      Throws:
      PulsarClientException.ProducerBusyException - if a producer with the same "producer name" is already connected to the topic
      PulsarClientException - if the producer creation fails
    • createAsync

      CompletableFuture<Producer<T>> createAsync()
      Finalize the creation of the Producer instance in asynchronous mode.

      This method will return a CompletableFuture that can be used to access the instance when it's ready.

      Returns:
      a future that will yield the created producer instance
      Throws:
      PulsarClientException.ProducerBusyException - if a producer with the same "producer name" is already connected to the topic
      PulsarClientException - if the producer creation fails
    • loadConf

      ProducerBuilder<T> loadConf(Map<String,Object> config)
      Load the configuration from provided config map.

      Example:

      
       Map<String, Object> config = new HashMap<>();
       config.put("producerName", "test-producer");
       config.put("sendTimeoutMs", 2000);
      
       ProducerBuilder<byte[]> builder = client.newProducer()
                        .loadConf(config);
      
       Producer<byte[]> producer = builder.create();
       
      Parameters:
      config - configuration map to load
      Returns:
      the producer builder instance
    • clone

      ProducerBuilder<T> clone()
      Create a copy of the current ProducerBuilder.

      Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For example:

      
       ProducerBuilder<String> builder = client.newProducer(Schema.STRING)
                        .sendTimeout(10, TimeUnit.SECONDS)
                        .blockIfQueueFull(true);
      
       Producer<String> producer1 = builder.clone().topic("topic-1").create();
       Producer<String> producer2 = builder.clone().topic("topic-2").create();
       
      Returns:
      a clone of the producer builder instance
    • topic

      ProducerBuilder<T> topic(String topicName)
      Specify the topic this producer will be publishing on.

      This argument is required when constructing the producer.

      Parameters:
      topicName - the name of the topic
      Returns:
      the producer builder instance
    • producerName

      ProducerBuilder<T> producerName(String producerName)
      Specify a name for the producer.

      If not assigned, the system will generate a globally unique name which can be accessed with Producer.getProducerName().

      Warning: When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on a topic.

      Parameters:
      producerName - the custom name to use for the producer
      Returns:
      the producer builder instance
    • accessMode

      ProducerBuilder<T> accessMode(ProducerAccessMode accessMode)
      Configure the type of access mode that the producer requires on the topic.

      Possible values are:

      Parameters:
      accessMode - The type of access to the topic that the producer requires
      Returns:
      the producer builder instance
      See Also:
    • sendTimeout

      ProducerBuilder<T> sendTimeout(int sendTimeout, TimeUnit unit)
      Set the send timeout (default: 30 seconds).

      If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.

      Setting the timeout to zero with setTimeout(0, TimeUnit.SECONDS) will set the timeout to infinity. This can be useful when using Pulsar's message deduplication feature, since the client library will retry forever to publish a message. No errors will be propagated back to the application.

      Parameters:
      sendTimeout - the send timeout
      unit - the time unit of the sendTimeout
      Returns:
      the producer builder instance
    • maxPendingMessages

      ProducerBuilder<T> maxPendingMessages(int maxPendingMessages)
      Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.

      When the queue is full, by default, all calls to Producer.send(T) and Producer.sendAsync(T) will fail unless blockIfQueueFull=true. Use blockIfQueueFull(boolean) to change the blocking behavior.

      The producer queue size also determines the max amount of memory that will be required by the client application. Until the producer gets a successful acknowledgment back from the broker, it will keep in memory (direct memory pool) all the messages in the pending queue.

      Default is 0, which disables the pending messages check.

      Parameters:
      maxPendingMessages - the max size of the pending messages queue for the producer
      Returns:
      the producer builder instance
    • maxPendingMessagesAcrossPartitions

      @Deprecated ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions)
      Deprecated.
      Set the number of max pending messages across all partitions.

      This setting will be used to lower the max pending messages for each partition (maxPendingMessages(int)), if the total exceeds the configured value. The purpose of this setting is to have an upper-limit on the number of pending messages when publishing on a partitioned topic.

      Default is 0, which disables the pending messages across partitions check.

      If publishing at a high rate over a topic with many partitions (especially when publishing messages without a partitioning key), it might be beneficial to increase this parameter to allow for more pipelining within the individual partitions' producers.

      Parameters:
      maxPendingMessagesAcrossPartitions - max pending messages across all the partitions
      Returns:
      the producer builder instance
    • blockIfQueueFull

      ProducerBuilder<T> blockIfQueueFull(boolean blockIfQueueFull)
      Set whether the Producer.send(T) and Producer.sendAsync(T) operations should block when the outgoing message queue is full.

      Default is false. If set to false, send operations will immediately fail with PulsarClientException.ProducerQueueIsFullError when there is no space left in pending queue. If set to true, the Producer.sendAsync(T) operation will instead block.

      Setting blockIfQueueFull=true simplifies the task of an application that just wants to publish messages as fast as possible, without having to worry about overflowing the producer send queue.

      For example:

      
       Producer<String> producer = client.newProducer()
                        .topic("my-topic")
                        .blockIfQueueFull(true)
                        .create();
      
       while (true) {
           producer.sendAsync("my-message")
                .thenAccept(messageId -> {
                    System.out.println("Published message: " + messageId);
                })
                .exceptionally(ex -> {
                    System.err.println("Failed to publish: " + e);
                    return null;
                });
       }
       
      Parameters:
      blockIfQueueFull - whether to block Producer.send(T) and Producer.sendAsync(T) operations on queue full
      Returns:
      the producer builder instance
    • messageRoutingMode

      ProducerBuilder<T> messageRoutingMode(MessageRoutingMode messageRoutingMode)
      Set the MessageRoutingMode for a partitioned producer.

      Default routing mode is to round-robin across the available partitions.

      This logic is applied when the application is not setting a key on a particular message. If the key is set with MessageBuilder#setKey(String), then the hash of the key will be used to select a partition for the message.

      Parameters:
      messageRoutingMode - the message routing mode
      Returns:
      the producer builder instance
      See Also:
    • hashingScheme

      ProducerBuilder<T> hashingScheme(HashingScheme hashingScheme)
      Change the HashingScheme used to choose the partition on which to publish a particular message.

      Standard hashing functions available are:

      Parameters:
      hashingScheme - the chosen HashingScheme
      Returns:
      the producer builder instance
    • compressionType

      ProducerBuilder<T> compressionType(CompressionType compressionType)
      Set the compression type for the producer.

      By default, message payloads are not compressed.

      Supported compression types are:

      Parameters:
      compressionType - the selected compression type
      Returns:
      the producer builder instance
    • messageRouter

      ProducerBuilder<T> messageRouter(MessageRouter messageRouter)
      Set a custom message routing policy by passing an implementation of MessageRouter.
      Parameters:
      messageRouter -
      Returns:
      the producer builder instance
    • enableBatching

      ProducerBuilder<T> enableBatching(boolean enableBatching)
      Set automatic batching of messages for the producer. default: enabled

      When batching is enabled, multiple calls to Producer.sendAsync(T) can result in a single batch being sent to the broker, leading to better throughput, especially when publishing small messages. If compression is enabled, messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or contents.

      When enabled, default batch delay is set to 1 ms and default batch size is 1000 messages.

      Batching is enabled by default since 2.0.0.

      Returns:
      the producer builder instance
      See Also:
    • enableChunking

      ProducerBuilder<T> enableChunking(boolean enableChunking)
      If a message's size is higher than the broker's allowed max publish-payload size, enableChunking allows the producer to split the message into multiple chunks and publish it to the broker separately and in order.

      This feature allows the publishing of large messages by splitting messages into multiple chunks and re-assembling them with the consumer to form the original large published message. Therefore, this configuration of the pulsar producer and consumer is recommended to use this feature:

       1. This feature is currently only supported for non-shared subscriptions and persistent topics.
       2. Disable batching to use chunking feature.
       3. Pulsar-client stores published messages in buffer cache until it receives acknowledgement from the broker.
       Therefore, it's best practice to reduce the "maxPendingMessages" size to avoid the producer occupying large
       amounts of memory with buffered messages.
       4. Set message-ttl on the namespace to clean up incomplete chunked messages.
       (If a producer fails to publish an entire large message, the consumer will be unable to consume and acknowledge
       those messages. These messages can only be discarded by message TTL or by configuring
       ConsumerBuilder.expireTimeOfIncompleteChunkedMessage(long, java.util.concurrent.TimeUnit).
       5. Consumer configuration: consumer should also configure receiverQueueSize and maxPendingChunkedMessage
       
      Parameters:
      enableChunking -
      Returns:
    • chunkMaxMessageSize

      ProducerBuilder<T> chunkMaxMessageSize(int chunkMaxMessageSize)
      Max chunk-message size in bytes. Producer chunks the message if chunking is enabled and message size is larger than max chunk-message size. By default, chunkMaxMessageSize value is -1 and the producer chunks based on the max-message size configured on the broker.
      Parameters:
      chunkMaxMessageSize -
      Returns:
    • cryptoKeyReader

      ProducerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader)
      Sets a CryptoKeyReader.

      Configure the key reader to be used to encrypt the message payloads.

      Parameters:
      cryptoKeyReader - CryptoKeyReader object
      Returns:
      the producer builder instance
    • defaultCryptoKeyReader

      ProducerBuilder<T> defaultCryptoKeyReader(String publicKey)
      Sets the default implementation of CryptoKeyReader.

      Configure the key reader to be used to encrypt the message payloads.

      Parameters:
      publicKey - the public key that is always used to encrypt message payloads.
      Returns:
      the producer builder instance
      Since:
      2.8.0
    • defaultCryptoKeyReader

      ProducerBuilder<T> defaultCryptoKeyReader(Map<String,String> publicKeys)
      Sets the default implementation of CryptoKeyReader.

      Configure the key reader to be used to encrypt the message payloads.

      Parameters:
      publicKeys - the map of public key names and their URIs used to encrypt message payloads.
      Returns:
      the producer builder instance
      Since:
      2.8.0
    • messageCrypto

      ProducerBuilder<T> messageCrypto(MessageCrypto messageCrypto)
      Sets a MessageCrypto.

      Contains methods to encrypt/decrypt messages for end-to-end encryption.

      Parameters:
      messageCrypto - MessageCrypto object
      Returns:
      the producer builder instance
    • addEncryptionKey

      ProducerBuilder<T> addEncryptionKey(String key)
      Add public encryption key, used by producer to encrypt the data key.

      At the time of producer creation, the Pulsar client checks if there are keys added to encryptionKeys. If keys are found, a callback CryptoKeyReader.getPrivateKey(String, Map) and CryptoKeyReader.getPublicKey(String, Map) is invoked against each key to load the values of the key.

      Application should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted after compression. If batch messaging is enabled, the batched message is encrypted.

      Parameters:
      key - the name of the encryption key in the key store
      Returns:
      the producer builder instance
    • cryptoFailureAction

      ProducerBuilder<T> cryptoFailureAction(ProducerCryptoFailureAction action)
      Set the ProducerCryptoFailureAction to the value specified.
      Parameters:
      action - the action the producer will take in case of encryption failures
      Returns:
      the producer builder instance
    • batchingMaxPublishDelay

      ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit)
      Set the time period within which messages sent will be batched if batch messages are enabled. The default value is 1 ms. If set to a non-zero value, messages will be queued until either:
      • this time interval expires
      • the max number of messages in a batch is reached (batchingMaxMessages(int))
      • the max size of batch is reached

      All messages will be published as a single batch message. The consumer will be delivered individual messages in the batch in the same order they were enqueued.

      Parameters:
      batchDelay - the batch delay
      timeUnit - the time unit of the batchDelay
      Returns:
      the producer builder instance
      See Also:
    • roundRobinRouterBatchingPartitionSwitchFrequency

      ProducerBuilder<T> roundRobinRouterBatchingPartitionSwitchFrequency(int frequency)
      Set the partition switch frequency while batching of messages is enabled and using round-robin routing mode for non-keyed messages. Default: 10.

      The time period of partition switch is frequency * batchingMaxPublishDelay. During this period, all arriving messages will be routed to the same partition.

      Parameters:
      frequency - the frequency of partition switch
      Returns:
      the producer builder instance
      See Also:
    • batchingMaxMessages

      ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch)
      Set the maximum number of messages permitted in a batch. The default value is 1000. If set to a value greater than 1, messages will be queued until this threshold is reached or batch interval has elapsed.

      All messages in batch will be published as a single batch message. The consumer will be delivered individual messages in the batch in the same order they were enqueued.

      Parameters:
      batchMessagesMaxMessagesPerBatch - maximum number of messages in a batch
      Returns:
      the producer builder instance
      See Also:
    • batchingMaxBytes

      ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes)
      Set the maximum number of bytes permitted in a batch. The default value is 128KB. If set to a value greater than 0, messages will be queued until this threshold is reached or other batching conditions are met.

      All messages in a batch will be published as a single batched message. The consumer will be delivered individual messages in the batch in the same order they were enqueued.

      Parameters:
      batchingMaxBytes - maximum number of bytes in a batch
      Returns:
      the producer builder instance
      See Also:
    • batcherBuilder

      ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder)
      Set the batcher builder BatcherBuilder of the producer. Producer will use the batcher builder to build a batch message container.This is only be used when batching is enabled.
      Parameters:
      batcherBuilder - batcher builder
      Returns:
      the producer builder instance
    • initialSequenceId

      ProducerBuilder<T> initialSequenceId(long initialSequenceId)
      Set the baseline for sequence ids for messages published by the producer.

      First message will use (initialSequenceId + 1) as its sequence id, and subsequent messages will be assigned incremental sequence ids, if not otherwise specified.

      Parameters:
      initialSequenceId - the initial sequence id for the producer
      Returns:
      the producer builder instance
    • property

      ProducerBuilder<T> property(String key, String value)
      Set a name/value property for this producer.

      Properties are application-defined metadata that can be attached to the producer. When getting topic stats, this metadata will be associated to the producer stats for easier identification.

      Parameters:
      key - the property key
      value - the property value
      Returns:
      the producer builder instance
    • properties

      ProducerBuilder<T> properties(Map<String,String> properties)
      Add all the properties in the provided map to the producer.

      Properties are application-defined metadata that can be attached to the producer. When getting topic stats, this metadata will be associated to the producer stats for easier identification.

      Parameters:
      properties - the map of properties
      Returns:
      the producer builder instance
    • intercept

      @Deprecated ProducerBuilder<T> intercept(ProducerInterceptor<T>... interceptors)
      Deprecated.
      Add a set of ProducerInterceptors to the producer.

      Interceptors can be used to trace the publish and acknowledgments operations happening in a producer.

      Parameters:
      interceptors - the list of interceptors to intercept the producer created by this builder.
      Returns:
      the producer builder instance
    • intercept

      ProducerBuilder<T> intercept(ProducerInterceptor... interceptors)
      Add a set of ProducerInterceptor to the producer.

      Interceptors can be used to trace the publish and acknowledgments operation happening in a producer.

      Parameters:
      interceptors - the list of interceptors to intercept the producer created by this builder.
      Returns:
      the producer builder instance
    • autoUpdatePartitions

      ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate)
      If enabled, partitioned producer will automatically discover new partitions at runtime. This is only applied on partitioned topics.

      Default is true.

      Parameters:
      autoUpdate - whether to auto discover partition configuration changes
      Returns:
      the producer builder instance
    • autoUpdatePartitionsInterval

      ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit)
      Set the interval of updating partitions. The default value is 1 minute. This only works if autoUpdatePartitions is enabled.
      Parameters:
      interval - the interval of updating partitions
      unit - the time unit of the interval.
      Returns:
      the producer builder instance
    • enableMultiSchema

      ProducerBuilder<T> enableMultiSchema(boolean multiSchema)
      Set the multiple schema mode for producer. If enabled, the producer can send a message with a schema different from the schema specified at creation.

      >Otherwise, if the producer wanted to send a message with different schema, an invalid message exception would be thrown

      Enabled by default.

      Parameters:
      multiSchema - enable or disable multiple schema mode
      Returns:
      the producer builder instance
      Since:
      2.5.0
    • enableLazyStartPartitionedProducers

      ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean lazyStartPartitionedProducers)
      This config affects Shared mode producers of partitioned topics only. It controls whether producers register and connect immediately to the owner broker of each partition or start lazily on demand. The internal producer of one partition always starts immediately, as chosen by the routing policy, but the internal producers of any additional partitions are started on demand upon receiving their first message. Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when the SinglePartition or some custom partial partition routing policy like PartialRoundRobinMessageRouterImpl is used without keyed messages. Because producer connection can be on demand, this can produce extra send latency for the first messages of a given partition.
      Parameters:
      lazyStartPartitionedProducers - enable or disable starting partition producers lazily
      Returns:
      the producer builder instance