Interface ProducerBuilder<T>
- All Superinterfaces:
Cloneable
ProducerBuilder
is used to configure and create instances of Producer
.-
Method Summary
Modifier and TypeMethodDescriptionaccessMode
(ProducerAccessMode accessMode) Configure the type of access mode that the producer requires on the topic.addEncryptionKey
(String key) Add public encryption key, used by producer to encrypt the data key.autoUpdatePartitions
(boolean autoUpdate) If enabled, partitioned producer will automatically discover new partitions at runtime.autoUpdatePartitionsInterval
(int interval, TimeUnit unit) Set the interval of updating partitions.batcherBuilder
(BatcherBuilder batcherBuilder) Set the batcher builderBatcherBuilder
of the producer.batchingMaxBytes
(int batchingMaxBytes) Set the maximum number of bytes permitted in a batch.batchingMaxMessages
(int batchMessagesMaxMessagesPerBatch) Set the maximum number of messages permitted in a batch.batchingMaxPublishDelay
(long batchDelay, TimeUnit timeUnit) Set the time period within which messages sent will be batched if batch messages are enabled.blockIfQueueFull
(boolean blockIfQueueFull) Set whether theProducer.send(T)
andProducer.sendAsync(T)
operations should block when the outgoing message queue is full.chunkMaxMessageSize
(int chunkMaxMessageSize) Max chunk-message size in bytes.clone()
Create a copy of the currentProducerBuilder
.compressionType
(CompressionType compressionType) Set the compression type for the producer.create()
Finalize the creation of theProducer
instance.Finalize the creation of theProducer
instance in asynchronous mode.Set the ProducerCryptoFailureAction to the value specified.cryptoKeyReader
(CryptoKeyReader cryptoKeyReader) Sets aCryptoKeyReader
.defaultCryptoKeyReader
(String publicKey) Sets the default implementation ofCryptoKeyReader
.defaultCryptoKeyReader
(Map<String, String> publicKeys) Sets the default implementation ofCryptoKeyReader
.enableBatching
(boolean enableBatching) Set automatic batching of messages for the producer.enableChunking
(boolean enableChunking) If a message's size is higher than the broker's allowed max publish-payload size, enableChunking allows the producer to split the message into multiple chunks and publish it to the broker separately and in order.enableLazyStartPartitionedProducers
(boolean lazyStartPartitionedProducers) This config affects Shared mode producers of partitioned topics only.enableMultiSchema
(boolean multiSchema) Set the multiple schema mode for producer.hashingScheme
(HashingScheme hashingScheme) Change theHashingScheme
used to choose the partition on which to publish a particular message.initialSequenceId
(long initialSequenceId) Set the baseline for sequence ids for messages published by the producer.intercept
(ProducerInterceptor... interceptors) Add a set ofProducerInterceptor
to the producer.intercept
(ProducerInterceptor<T>... interceptors) Deprecated.Load the configuration from provided config map.maxPendingMessages
(int maxPendingMessages) Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.maxPendingMessagesAcrossPartitions
(int maxPendingMessagesAcrossPartitions) Deprecated.messageCrypto
(MessageCrypto messageCrypto) Sets aMessageCrypto
.messageRouter
(MessageRouter messageRouter) Set a custom message routing policy by passing an implementation of MessageRouter.messageRoutingMode
(MessageRoutingMode messageRoutingMode) Set theMessageRoutingMode
for a partitioned producer.producerName
(String producerName) Specify a name for the producer.properties
(Map<String, String> properties) Add all the properties in the provided map to the producer.Set a name/value property for this producer.roundRobinRouterBatchingPartitionSwitchFrequency
(int frequency) Set the partition switch frequency while batching of messages is enabled and using round-robin routing mode for non-keyed messages.sendTimeout
(int sendTimeout, TimeUnit unit) Set the send timeout (default: 30 seconds).Specify the topic this producer will be publishing on.
-
Method Details
-
create
Finalize the creation of theProducer
instance.This method will block until the producer is created successfully.
- Returns:
- the producer instance
- Throws:
PulsarClientException.ProducerBusyException
- if a producer with the same "producer name" is already connected to the topicPulsarClientException
- if the producer creation fails
-
createAsync
CompletableFuture<Producer<T>> createAsync()Finalize the creation of theProducer
instance in asynchronous mode.This method will return a
CompletableFuture
that can be used to access the instance when it's ready.- Returns:
- a future that will yield the created producer instance
- Throws:
PulsarClientException.ProducerBusyException
- if a producer with the same "producer name" is already connected to the topicPulsarClientException
- if the producer creation fails
-
loadConf
Load the configuration from provided config map.Example:
Map<String, Object> config = new HashMap<>(); config.put("producerName", "test-producer"); config.put("sendTimeoutMs", 2000); ProducerBuilder<byte[]> builder = client.newProducer() .loadConf(config); Producer<byte[]> producer = builder.create();
- Parameters:
config
- configuration map to load- Returns:
- the producer builder instance
-
clone
ProducerBuilder<T> clone()Create a copy of the currentProducerBuilder
.Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For example:
ProducerBuilder<String> builder = client.newProducer(Schema.STRING) .sendTimeout(10, TimeUnit.SECONDS) .blockIfQueueFull(true); Producer<String> producer1 = builder.clone().topic("topic-1").create(); Producer<String> producer2 = builder.clone().topic("topic-2").create();
- Returns:
- a clone of the producer builder instance
-
topic
Specify the topic this producer will be publishing on.This argument is required when constructing the producer.
- Parameters:
topicName
- the name of the topic- Returns:
- the producer builder instance
-
producerName
Specify a name for the producer.If not assigned, the system will generate a globally unique name which can be accessed with
Producer.getProducerName()
.Warning: When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on a topic.
- Parameters:
producerName
- the custom name to use for the producer- Returns:
- the producer builder instance
-
accessMode
Configure the type of access mode that the producer requires on the topic.Possible values are:
ProducerAccessMode.Shared
: By default multiple producers can publish on a topic.ProducerAccessMode.Exclusive
: Require exclusive access for producer. Fail immediately if there's already a producer connected.ProducerAccessMode.ExclusiveWithFencing
: Require exclusive access for the producer. Any existing producer will be removed and invalidated immediately.ProducerAccessMode.WaitForExclusive
: Producer creation is pending until it can acquire exclusive access.
- Parameters:
accessMode
- The type of access to the topic that the producer requires- Returns:
- the producer builder instance
- See Also:
-
sendTimeout
Set the send timeout (default: 30 seconds).If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
Setting the timeout to zero with
setTimeout(0, TimeUnit.SECONDS)
will set the timeout to infinity. This can be useful when using Pulsar's message deduplication feature, since the client library will retry forever to publish a message. No errors will be propagated back to the application.- Parameters:
sendTimeout
- the send timeoutunit
- the time unit of thesendTimeout
- Returns:
- the producer builder instance
-
maxPendingMessages
Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.When the queue is full, by default, all calls to
Producer.send(T)
andProducer.sendAsync(T)
will fail unlessblockIfQueueFull=true
. UseblockIfQueueFull(boolean)
to change the blocking behavior.The producer queue size also determines the max amount of memory that will be required by the client application. Until the producer gets a successful acknowledgment back from the broker, it will keep in memory (direct memory pool) all the messages in the pending queue.
Default is 0, which disables the pending messages check.
- Parameters:
maxPendingMessages
- the max size of the pending messages queue for the producer- Returns:
- the producer builder instance
-
maxPendingMessagesAcrossPartitions
@Deprecated ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) Deprecated.Set the number of max pending messages across all partitions.This setting will be used to lower the max pending messages for each partition (
maxPendingMessages(int)
), if the total exceeds the configured value. The purpose of this setting is to have an upper-limit on the number of pending messages when publishing on a partitioned topic.Default is 0, which disables the pending messages across partitions check.
If publishing at a high rate over a topic with many partitions (especially when publishing messages without a partitioning key), it might be beneficial to increase this parameter to allow for more pipelining within the individual partitions' producers.
- Parameters:
maxPendingMessagesAcrossPartitions
- max pending messages across all the partitions- Returns:
- the producer builder instance
-
blockIfQueueFull
Set whether theProducer.send(T)
andProducer.sendAsync(T)
operations should block when the outgoing message queue is full.Default is
false
. If set tofalse
, send operations will immediately fail withPulsarClientException.ProducerQueueIsFullError
when there is no space left in pending queue. If set totrue
, theProducer.sendAsync(T)
operation will instead block.Setting
blockIfQueueFull=true
simplifies the task of an application that just wants to publish messages as fast as possible, without having to worry about overflowing the producer send queue.For example:
Producer<String> producer = client.newProducer() .topic("my-topic") .blockIfQueueFull(true) .create(); while (true) { producer.sendAsync("my-message") .thenAccept(messageId -> { System.out.println("Published message: " + messageId); }) .exceptionally(ex -> { System.err.println("Failed to publish: " + e); return null; }); }
- Parameters:
blockIfQueueFull
- whether to blockProducer.send(T)
andProducer.sendAsync(T)
operations on queue full- Returns:
- the producer builder instance
-
messageRoutingMode
Set theMessageRoutingMode
for a partitioned producer.Default routing mode is to round-robin across the available partitions.
This logic is applied when the application is not setting a key on a particular message. If the key is set with
MessageBuilder#setKey(String)
, then the hash of the key will be used to select a partition for the message.- Parameters:
messageRoutingMode
- the message routing mode- Returns:
- the producer builder instance
- See Also:
-
hashingScheme
Change theHashingScheme
used to choose the partition on which to publish a particular message.Standard hashing functions available are:
HashingScheme.JavaStringHash
: JavaString.hashCode()
(Default)HashingScheme.Murmur3_32Hash
: Use Murmur3 hashing function. https://en.wikipedia.org/wiki/MurmurHash
- Parameters:
hashingScheme
- the chosenHashingScheme
- Returns:
- the producer builder instance
-
compressionType
Set the compression type for the producer.By default, message payloads are not compressed.
Supported compression types are:
CompressionType.NONE
: No compression (default)CompressionType.LZ4
: Compress with LZ4 algorithm. Faster but lower compression than ZLib.CompressionType.ZLIB
: Standard ZLib compression.CompressionType.ZSTD
Compress with Zstd codec. Since Pulsar 2.3, Zstd can only be used if consumer applications are also in version >= 2.3.CompressionType.SNAPPY
Compress with Snappy codec. Since Pulsar 2.4, Snappy can only be used if consumer applications are also in version >= 2.4.
- Parameters:
compressionType
- the selected compression type- Returns:
- the producer builder instance
-
messageRouter
Set a custom message routing policy by passing an implementation of MessageRouter.- Parameters:
messageRouter
-- Returns:
- the producer builder instance
-
enableBatching
Set automatic batching of messages for the producer. default: enabledWhen batching is enabled, multiple calls to
Producer.sendAsync(T)
can result in a single batch being sent to the broker, leading to better throughput, especially when publishing small messages. If compression is enabled, messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or contents.When enabled, default batch delay is set to 1 ms and default batch size is 1000 messages.
Batching is enabled by default since 2.0.0.
- Returns:
- the producer builder instance
- See Also:
-
enableChunking
If a message's size is higher than the broker's allowed max publish-payload size, enableChunking allows the producer to split the message into multiple chunks and publish it to the broker separately and in order.This feature allows the publishing of large messages by splitting messages into multiple chunks and re-assembling them with the consumer to form the original large published message. Therefore, this configuration of the pulsar producer and consumer is recommended to use this feature:
1. This feature is currently only supported for non-shared subscriptions and persistent topics. 2. Disable batching to use chunking feature. 3. Pulsar-client stores published messages in buffer cache until it receives acknowledgement from the broker. Therefore, it's best practice to reduce the "maxPendingMessages" size to avoid the producer occupying large amounts of memory with buffered messages. 4. Set message-ttl on the namespace to clean up incomplete chunked messages. (If a producer fails to publish an entire large message, the consumer will be unable to consume and acknowledge those messages. These messages can only be discarded by message TTL or by configuring
ConsumerBuilder.expireTimeOfIncompleteChunkedMessage(long, java.util.concurrent.TimeUnit)
. 5. Consumer configuration: consumer should also configure receiverQueueSize and maxPendingChunkedMessage- Parameters:
enableChunking
-- Returns:
-
chunkMaxMessageSize
Max chunk-message size in bytes. Producer chunks the message if chunking is enabled and message size is larger than max chunk-message size. By default, chunkMaxMessageSize value is -1 and the producer chunks based on the max-message size configured on the broker.- Parameters:
chunkMaxMessageSize
-- Returns:
-
cryptoKeyReader
Sets aCryptoKeyReader
.Configure the key reader to be used to encrypt the message payloads.
- Parameters:
cryptoKeyReader
- CryptoKeyReader object- Returns:
- the producer builder instance
-
defaultCryptoKeyReader
Sets the default implementation ofCryptoKeyReader
.Configure the key reader to be used to encrypt the message payloads.
- Parameters:
publicKey
- the public key that is always used to encrypt message payloads.- Returns:
- the producer builder instance
- Since:
- 2.8.0
-
defaultCryptoKeyReader
Sets the default implementation ofCryptoKeyReader
.Configure the key reader to be used to encrypt the message payloads.
- Parameters:
publicKeys
- the map of public key names and their URIs used to encrypt message payloads.- Returns:
- the producer builder instance
- Since:
- 2.8.0
-
messageCrypto
Sets aMessageCrypto
.Contains methods to encrypt/decrypt messages for end-to-end encryption.
- Parameters:
messageCrypto
- MessageCrypto object- Returns:
- the producer builder instance
-
addEncryptionKey
Add public encryption key, used by producer to encrypt the data key.At the time of producer creation, the Pulsar client checks if there are keys added to encryptionKeys. If keys are found, a callback
CryptoKeyReader.getPrivateKey(String, Map)
andCryptoKeyReader.getPublicKey(String, Map)
is invoked against each key to load the values of the key.Application should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted after compression. If batch messaging is enabled, the batched message is encrypted.
- Parameters:
key
- the name of the encryption key in the key store- Returns:
- the producer builder instance
-
cryptoFailureAction
Set the ProducerCryptoFailureAction to the value specified.- Parameters:
action
- the action the producer will take in case of encryption failures- Returns:
- the producer builder instance
-
batchingMaxPublishDelay
Set the time period within which messages sent will be batched if batch messages are enabled. The default value is 1 ms. If set to a non-zero value, messages will be queued until either:- this time interval expires
- the max number of messages in a batch is reached (
batchingMaxMessages(int)
) - the max size of batch is reached
All messages will be published as a single batch message. The consumer will be delivered individual messages in the batch in the same order they were enqueued.
- Parameters:
batchDelay
- the batch delaytimeUnit
- the time unit of thebatchDelay
- Returns:
- the producer builder instance
- See Also:
-
roundRobinRouterBatchingPartitionSwitchFrequency
Set the partition switch frequency while batching of messages is enabled and using round-robin routing mode for non-keyed messages. Default: 10.The time period of partition switch is frequency * batchingMaxPublishDelay. During this period, all arriving messages will be routed to the same partition.
- Parameters:
frequency
- the frequency of partition switch- Returns:
- the producer builder instance
- See Also:
-
batchingMaxMessages
Set the maximum number of messages permitted in a batch. The default value is 1000. If set to a value greater than 1, messages will be queued until this threshold is reached or batch interval has elapsed.All messages in batch will be published as a single batch message. The consumer will be delivered individual messages in the batch in the same order they were enqueued.
- Parameters:
batchMessagesMaxMessagesPerBatch
- maximum number of messages in a batch- Returns:
- the producer builder instance
- See Also:
-
batchingMaxBytes
Set the maximum number of bytes permitted in a batch. The default value is 128KB. If set to a value greater than 0, messages will be queued until this threshold is reached or other batching conditions are met.All messages in a batch will be published as a single batched message. The consumer will be delivered individual messages in the batch in the same order they were enqueued.
- Parameters:
batchingMaxBytes
- maximum number of bytes in a batch- Returns:
- the producer builder instance
- See Also:
-
batcherBuilder
Set the batcher builderBatcherBuilder
of the producer. Producer will use the batcher builder to build a batch message container.This is only be used when batching is enabled.- Parameters:
batcherBuilder
- batcher builder- Returns:
- the producer builder instance
-
initialSequenceId
Set the baseline for sequence ids for messages published by the producer.First message will use
(initialSequenceId + 1)
as its sequence id, and subsequent messages will be assigned incremental sequence ids, if not otherwise specified.- Parameters:
initialSequenceId
- the initial sequence id for the producer- Returns:
- the producer builder instance
-
property
Set a name/value property for this producer.Properties are application-defined metadata that can be attached to the producer. When getting topic stats, this metadata will be associated to the producer stats for easier identification.
- Parameters:
key
- the property keyvalue
- the property value- Returns:
- the producer builder instance
-
properties
Add all the properties in the provided map to the producer.Properties are application-defined metadata that can be attached to the producer. When getting topic stats, this metadata will be associated to the producer stats for easier identification.
- Parameters:
properties
- the map of properties- Returns:
- the producer builder instance
-
intercept
Deprecated.Add a set ofProducerInterceptor
s to the producer.Interceptors can be used to trace the publish and acknowledgments operations happening in a producer.
- Parameters:
interceptors
- the list of interceptors to intercept the producer created by this builder.- Returns:
- the producer builder instance
-
intercept
Add a set ofProducerInterceptor
to the producer.Interceptors can be used to trace the publish and acknowledgments operation happening in a producer.
- Parameters:
interceptors
- the list of interceptors to intercept the producer created by this builder.- Returns:
- the producer builder instance
-
autoUpdatePartitions
If enabled, partitioned producer will automatically discover new partitions at runtime. This is only applied on partitioned topics.Default is true.
- Parameters:
autoUpdate
- whether to auto discover partition configuration changes- Returns:
- the producer builder instance
-
autoUpdatePartitionsInterval
Set the interval of updating partitions. The default value is 1 minute. This only works if autoUpdatePartitions is enabled.- Parameters:
interval
- the interval of updating partitionsunit
- the time unit of the interval.- Returns:
- the producer builder instance
-
enableMultiSchema
Set the multiple schema mode for producer. If enabled, the producer can send a message with a schema different from the schema specified at creation.>Otherwise, if the producer wanted to send a message with different schema, an invalid message exception would be thrown
Enabled by default.
- Parameters:
multiSchema
- enable or disable multiple schema mode- Returns:
- the producer builder instance
- Since:
- 2.5.0
-
enableLazyStartPartitionedProducers
This config affects Shared mode producers of partitioned topics only. It controls whether producers register and connect immediately to the owner broker of each partition or start lazily on demand. The internal producer of one partition always starts immediately, as chosen by the routing policy, but the internal producers of any additional partitions are started on demand upon receiving their first message. Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when the SinglePartition or some custom partial partition routing policy like PartialRoundRobinMessageRouterImpl is used without keyed messages. Because producer connection can be on demand, this can produce extra send latency for the first messages of a given partition.- Parameters:
lazyStartPartitionedProducers
- enable or disable starting partition producers lazily- Returns:
- the producer builder instance
-