pulsar-client-cpp
Public Types | Public Member Functions | Friends | List of all members
pulsar::ProducerConfiguration Class Reference

#include <ProducerConfiguration.h>

Public Types

enum  PartitionsRoutingMode { UseSinglePartition, RoundRobinDistribution, CustomPartition }
 
enum  HashingScheme { Murmur3_32Hash, BoostHash, JavaStringHash }
 
enum  BatchingType { DefaultBatching, KeyBasedBatching }
 
enum  ProducerAccessMode { Shared = 0, Exclusive = 1, WaitForExclusive = 2, ExclusiveWithFencing = 3 }
 

Public Member Functions

 ProducerConfiguration (const ProducerConfiguration &)
 
ProducerConfigurationoperator= (const ProducerConfiguration &)
 
ProducerConfigurationsetProducerName (const std::string &producerName)
 
const std::string & getProducerName () const
 
ProducerConfigurationsetSchema (const SchemaInfo &schemaInfo)
 
const SchemaInfogetSchema () const
 
ProducerConfigurationsetSendTimeout (int sendTimeoutMs)
 
int getSendTimeout () const
 
ProducerConfigurationsetInitialSequenceId (int64_t initialSequenceId)
 
int64_t getInitialSequenceId () const
 
ProducerConfigurationsetCompressionType (CompressionType compressionType)
 
CompressionType getCompressionType () const
 
ProducerConfigurationsetMaxPendingMessages (int maxPendingMessages)
 
int getMaxPendingMessages () const
 
ProducerConfigurationsetMaxPendingMessagesAcrossPartitions (int maxPendingMessagesAcrossPartitions)
 
int getMaxPendingMessagesAcrossPartitions () const
 
ProducerConfigurationsetPartitionsRoutingMode (const PartitionsRoutingMode &mode)
 
PartitionsRoutingMode getPartitionsRoutingMode () const
 
ProducerConfigurationsetMessageRouter (const MessageRoutingPolicyPtr &router)
 
const MessageRoutingPolicyPtr & getMessageRouterPtr () const
 
ProducerConfigurationsetHashingScheme (const HashingScheme &scheme)
 
HashingScheme getHashingScheme () const
 
ProducerConfigurationsetLazyStartPartitionedProducers (bool)
 
bool getLazyStartPartitionedProducers () const
 
ProducerConfigurationsetBlockIfQueueFull (bool)
 
bool getBlockIfQueueFull () const
 
ProducerConfigurationsetBatchingEnabled (const bool &batchingEnabled)
 
const bool & getBatchingEnabled () const
 
ProducerConfigurationsetBatchingMaxMessages (const unsigned int &batchingMaxMessages)
 
const unsigned int & getBatchingMaxMessages () const
 
ProducerConfigurationsetBatchingMaxAllowedSizeInBytes (const unsigned long &batchingMaxAllowedSizeInBytes)
 
const unsigned long & getBatchingMaxAllowedSizeInBytes () const
 
ProducerConfigurationsetBatchingMaxPublishDelayMs (const unsigned long &batchingMaxPublishDelayMs)
 
const unsigned long & getBatchingMaxPublishDelayMs () const
 
ProducerConfigurationsetBatchingType (BatchingType batchingType)
 
BatchingType getBatchingType () const
 
const CryptoKeyReaderPtr getCryptoKeyReader () const
 
ProducerConfigurationsetCryptoKeyReader (CryptoKeyReaderPtr cryptoKeyReader)
 
ProducerCryptoFailureAction getCryptoFailureAction () const
 
ProducerConfigurationsetCryptoFailureAction (ProducerCryptoFailureAction action)
 
const std::set< std::string > & getEncryptionKeys () const
 
bool isEncryptionEnabled () const
 
ProducerConfigurationaddEncryptionKey (std::string key)
 
bool hasProperty (const std::string &name) const
 
const std::string & getProperty (const std::string &name) const
 
std::map< std::string, std::string > & getProperties () const
 
ProducerConfigurationsetProperty (const std::string &name, const std::string &value)
 
ProducerConfigurationsetProperties (const std::map< std::string, std::string > &properties)
 
ProducerConfigurationsetChunkingEnabled (bool chunkingEnabled)
 
bool isChunkingEnabled () const
 
ProducerConfigurationsetAccessMode (const ProducerAccessMode &accessMode)
 
ProducerAccessMode getAccessMode () const
 
ProducerConfigurationintercept (const std::vector< ProducerInterceptorPtr > &interceptors)
 
const std::vector< ProducerInterceptorPtr > & getInterceptors () const
 

Friends

class PulsarWrapper
 
class ConsumerImpl
 
class ProducerImpl
 

Detailed Description

Class that holds the configuration for a producer

Member Enumeration Documentation

◆ BatchingType

Enumerator
DefaultBatching 

Default batching.

incoming single messages: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)

batched into single batch message: [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]

KeyBasedBatching 

Key based batching.

incoming single messages: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)

batched into single batch message: [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]

◆ ProducerAccessMode

Enumerator
Shared 

By default multiple producers can publish on a topic.

Exclusive 

Require exclusive access for producer. Fail immediately if there's already a producer connected.

WaitForExclusive 

Producer creation is pending until it can acquire exclusive access.

ExclusiveWithFencing 

Acquire exclusive access for the producer. Any existing producer will be removed and invalidated immediately.

Member Function Documentation

◆ addEncryptionKey()

ProducerConfiguration& pulsar::ProducerConfiguration::addEncryptionKey ( std::string  key)

Add public encryption key, used by producer to encrypt the data key.

At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted after compression. If batch messaging is enabled, the batched message is encrypted.

@key the encryption key to add

Returns
the ProducerConfiguration self

◆ getAccessMode()

ProducerAccessMode pulsar::ProducerConfiguration::getAccessMode ( ) const

Get the type of access mode that the producer requires on the topic.

◆ getBatchingEnabled()

const bool& pulsar::ProducerConfiguration::getBatchingEnabled ( ) const

Return the flag whether automatic message batching is enabled or not for the producer.

Returns
true if automatic message batching is enabled. Otherwise it returns false.
Since
2.0.0
It is enabled by default.

◆ getBatchingMaxAllowedSizeInBytes()

const unsigned long& pulsar::ProducerConfiguration::getBatchingMaxAllowedSizeInBytes ( ) const

The getter associated with setBatchingMaxAllowedSizeInBytes().

◆ getBatchingMaxMessages()

const unsigned int& pulsar::ProducerConfiguration::getBatchingMaxMessages ( ) const

The getter associated with setBatchingMaxMessages().

◆ getBatchingMaxPublishDelayMs()

const unsigned long& pulsar::ProducerConfiguration::getBatchingMaxPublishDelayMs ( ) const

The getter associated with setBatchingMaxPublishDelayMs().

◆ getBatchingType()

BatchingType pulsar::ProducerConfiguration::getBatchingType ( ) const
Returns
batching type.
See also
BatchingType.

◆ getBlockIfQueueFull()

bool pulsar::ProducerConfiguration::getBlockIfQueueFull ( ) const
Returns
whether Producer::send or Producer::sendAsync operations should block when the outgoing message queue is full. (Default: false)

◆ getCompressionType()

CompressionType pulsar::ProducerConfiguration::getCompressionType ( ) const

The getter associated with setCompressionType().

◆ getCryptoFailureAction()

ProducerCryptoFailureAction pulsar::ProducerConfiguration::getCryptoFailureAction ( ) const

The getter associated with setCryptoFailureAction().

◆ getCryptoKeyReader()

const CryptoKeyReaderPtr pulsar::ProducerConfiguration::getCryptoKeyReader ( ) const

The getter associated with setCryptoKeyReader().

◆ getEncryptionKeys()

const std::set<std::string>& pulsar::ProducerConfiguration::getEncryptionKeys ( ) const
Returns
all the encryption keys added

◆ getHashingScheme()

HashingScheme pulsar::ProducerConfiguration::getHashingScheme ( ) const

The getter associated with setHashingScheme().

◆ getInitialSequenceId()

int64_t pulsar::ProducerConfiguration::getInitialSequenceId ( ) const

The getter associated with setInitialSequenceId().

◆ getLazyStartPartitionedProducers()

bool pulsar::ProducerConfiguration::getLazyStartPartitionedProducers ( ) const

The getter associated with setLazyStartPartitionedProducers()

◆ getMaxPendingMessages()

int pulsar::ProducerConfiguration::getMaxPendingMessages ( ) const

The getter associated with setMaxPendingMessages().

◆ getMaxPendingMessagesAcrossPartitions()

int pulsar::ProducerConfiguration::getMaxPendingMessagesAcrossPartitions ( ) const
Returns
the maximum number of pending messages allowed across all the partitions

◆ getMessageRouterPtr()

const MessageRoutingPolicyPtr& pulsar::ProducerConfiguration::getMessageRouterPtr ( ) const

The getter associated with setMessageRouter().

◆ getPartitionsRoutingMode()

PartitionsRoutingMode pulsar::ProducerConfiguration::getPartitionsRoutingMode ( ) const

The getter associated with setPartitionsRoutingMode().

◆ getProducerName()

const std::string& pulsar::ProducerConfiguration::getProducerName ( ) const

The getter associated with setProducerName().

◆ getProperties()

std::map<std::string, std::string>& pulsar::ProducerConfiguration::getProperties ( ) const

Get all the properties attached to this producer.

◆ getProperty()

const std::string& pulsar::ProducerConfiguration::getProperty ( const std::string &  name) const

Get the value of a specific property

Parameters
namethe name of the property
Returns
the value of the property or null if the property was not defined

◆ getSchema()

const SchemaInfo& pulsar::ProducerConfiguration::getSchema ( ) const
Returns
the schema information declared for this producer

◆ getSendTimeout()

int pulsar::ProducerConfiguration::getSendTimeout ( ) const

Get the send timeout is milliseconds.

If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.

If the timeout is zero, there will be no timeout.

Returns
the send timeout in milliseconds (Default: 30000)

◆ hasProperty()

bool pulsar::ProducerConfiguration::hasProperty ( const std::string &  name) const

Check whether the producer has a specific property attached.

Parameters
namethe name of the property to check
Returns
true if the message has the specified property
false if the property is not defined

◆ isChunkingEnabled()

bool pulsar::ProducerConfiguration::isChunkingEnabled ( ) const

The getter associated with setChunkingEnabled().

◆ isEncryptionEnabled()

bool pulsar::ProducerConfiguration::isEncryptionEnabled ( ) const
Returns
true if encryption keys are added

◆ setAccessMode()

ProducerConfiguration& pulsar::ProducerConfiguration::setAccessMode ( const ProducerAccessMode accessMode)

Set the type of access mode that the producer requires on the topic.

See also
ProducerAccessMode
Parameters
accessModeThe type of access to the topic that the producer requires

◆ setBatchingEnabled()

ProducerConfiguration& pulsar::ProducerConfiguration::setBatchingEnabled ( const bool &  batchingEnabled)

Control whether automatic batching of messages is enabled or not for the producer.

Default: true

When automatic batching is enabled, multiple calls to Producer::sendAsync can result in a single batch to be sent to the broker, leading to better throughput, especially when publishing small messages. If compression is enabled, messages are compressed at the batch level, leading to a much better compression ratio for similar headers or contents.

When the default batch delay is set to 10 ms and the default batch size is 1000 messages.

See also
ProducerConfiguration::setBatchingMaxPublishDelayMs

◆ setBatchingMaxAllowedSizeInBytes()

ProducerConfiguration& pulsar::ProducerConfiguration::setBatchingMaxAllowedSizeInBytes ( const unsigned long &  batchingMaxAllowedSizeInBytes)

Set the max size of messages permitted in a batch. Default value: 128 KB. If you set this option to a value greater than 1, messages are queued until this threshold is reached or batch interval has elapsed.

All messages in a batch are published as a single batch message. The consumer is delivered individual messages in the batch in the same order they are enqueued.

Parameters
batchingMaxAllowedSizeInBytes

◆ setBatchingMaxMessages()

ProducerConfiguration& pulsar::ProducerConfiguration::setBatchingMaxMessages ( const unsigned int &  batchingMaxMessages)

Set the max number of messages permitted in a batch. Default value: 1000. If you set this option to a value greater than 1, messages are queued until this threshold is reached or batch interval has elapsed.

All messages in a batch are published as a single batch message. The consumer is delivered individual messages in the batch in the same order they are enqueued.

Parameters
batchMessagesMaxMessagesPerBatchmax number of messages permitted in a batch
Returns

◆ setBatchingMaxPublishDelayMs()

ProducerConfiguration& pulsar::ProducerConfiguration::setBatchingMaxPublishDelayMs ( const unsigned long &  batchingMaxPublishDelayMs)

Set the max time for message publish delay permitted in a batch. Default value: 10 ms.

Parameters
batchingMaxPublishDelayMsmax time for message publish delay permitted in a batch.
Returns

◆ setBatchingType()

ProducerConfiguration& pulsar::ProducerConfiguration::setBatchingType ( BatchingType  batchingType)

Default: DefaultBatching

See also
BatchingType

◆ setBlockIfQueueFull()

ProducerConfiguration& pulsar::ProducerConfiguration::setBlockIfQueueFull ( bool  )

The setter associated with getBlockIfQueueFull()

◆ setChunkingEnabled()

ProducerConfiguration& pulsar::ProducerConfiguration::setChunkingEnabled ( bool  chunkingEnabled)

If message size is higher than allowed max publish-payload size by broker then enableChunking helps producer to split message into multiple chunks and publish them to broker separately in order. So, it allows client to successfully publish large size of messages in pulsar.

Set it true to enable this feature. If so, you must disable batching (see setBatchingEnabled), otherwise the producer creation will fail.

There are some other recommendations when it's enabled:

  1. This features is right now only supported for non-shared subscription and persistent-topic.
  2. It's better to reduce setMaxPendingMessages to avoid producer accupying large amount of memory by buffered messages.
  3. Set message-ttl on the namespace to cleanup chunked messages. Sometimes due to broker-restart or publish time, producer might fail to publish entire large message. So, consumer will not be able to consume and ack those messages.

Default: false

Parameters
chunkingEnabledwhether chunking is enabled
Returns
the ProducerConfiguration self

◆ setCompressionType()

ProducerConfiguration& pulsar::ProducerConfiguration::setCompressionType ( CompressionType  compressionType)

Set the compression type for the producer.

By default, message payloads are not compressed. Supported compression types are:

◆ setCryptoFailureAction()

ProducerConfiguration& pulsar::ProducerConfiguration::setCryptoFailureAction ( ProducerCryptoFailureAction  action)

Sets the ProducerCryptoFailureAction to the value specified.

Parameters
actionthe action taken by the producer in case of encryption failures.
Returns

◆ setCryptoKeyReader()

ProducerConfiguration& pulsar::ProducerConfiguration::setCryptoKeyReader ( CryptoKeyReaderPtr  cryptoKeyReader)

Set the shared pointer to CryptoKeyReader.

Parameters
sharedpointer to CryptoKeyReader.
Returns

◆ setHashingScheme()

ProducerConfiguration& pulsar::ProducerConfiguration::setHashingScheme ( const HashingScheme &  scheme)

Set the hashing scheme, which is a standard hashing function available when choosing the partition used for a particular message.

Default: HashingScheme::BoostHash

Standard hashing functions available are:

Parameters
schemehashing scheme.
Returns

◆ setInitialSequenceId()

ProducerConfiguration& pulsar::ProducerConfiguration::setInitialSequenceId ( int64_t  initialSequenceId)

Set the baseline of the sequence ID for messages published by the producer.

The first message uses (initialSequenceId + 1) as its sequence ID and subsequent messages are assigned incremental sequence IDs.

Default: -1, which means the first message's sequence ID is 0.

Parameters
initialSequenceIdthe initial sequence ID for the producer.
Returns

◆ setLazyStartPartitionedProducers()

ProducerConfiguration& pulsar::ProducerConfiguration::setLazyStartPartitionedProducers ( bool  )

This config affects producers of partitioned topics only. It controls whether producers register and connect immediately to the owner broker of each partition or start lazily on demand. The internal producer of one partition is always started eagerly, chosen by the routing policy, but the internal producers of any additional partitions are started on demand, upon receiving their first message. Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when the SinglePartition routing policy is used without keyed messages. Because producer connection can be on demand, this can produce extra send latency for the first messages of a given partition.

Parameters
true/falseas to whether to start partition producers lazily
Returns

◆ setMaxPendingMessages()

ProducerConfiguration& pulsar::ProducerConfiguration::setMaxPendingMessages ( int  maxPendingMessages)

Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.

When the queue is full, by default, all calls to Producer::send and Producer::sendAsync would fail unless blockIfQueueFull is set to true. Use setBlockIfQueueFull to change the blocking behavior.

Default: 1000

Parameters
maxPendingMessagesmax number of pending messages.
Returns

◆ setMaxPendingMessagesAcrossPartitions()

ProducerConfiguration& pulsar::ProducerConfiguration::setMaxPendingMessagesAcrossPartitions ( int  maxPendingMessagesAcrossPartitions)

Set the number of max pending messages across all the partitions

This setting will be used to lower the max pending messages for each partition (setMaxPendingMessages(int)), if the total exceeds the configured value.

Default: 50000

Parameters
maxPendingMessagesAcrossPartitions

◆ setMessageRouter()

ProducerConfiguration& pulsar::ProducerConfiguration::setMessageRouter ( const MessageRoutingPolicyPtr &  router)

Set a custom message routing policy by passing an implementation of MessageRouter.

Parameters
messageRoutermessage router.
Returns

◆ setPartitionsRoutingMode()

ProducerConfiguration& pulsar::ProducerConfiguration::setPartitionsRoutingMode ( const PartitionsRoutingMode &  mode)

Set the message routing modes for partitioned topics.

Default: UseSinglePartition

Parameters
PartitionsRoutingModepartition routing mode.
Returns

◆ setProducerName()

ProducerConfiguration& pulsar::ProducerConfiguration::setProducerName ( const std::string &  producerName)

Set the producer name which could be assigned by the system or specified by the client.

Parameters
producerNameproducer name.
Returns

◆ setProperties()

ProducerConfiguration& pulsar::ProducerConfiguration::setProperties ( const std::map< std::string, std::string > &  properties)

Add all the properties in the provided map

◆ setProperty()

ProducerConfiguration& pulsar::ProducerConfiguration::setProperty ( const std::string &  name,
const std::string &  value 
)

Sets a new property on the producer

Parameters
namethe name of the property
valuethe associated value

◆ setSchema()

ProducerConfiguration& pulsar::ProducerConfiguration::setSchema ( const SchemaInfo schemaInfo)

Declare the schema of the data that will be published by this producer.

The schema will be checked against the schema of the topic, and it will fail if it's not compatible, though the client library will not perform any validation that the actual message payload are conforming to the specified schema.

For all purposes, this

Parameters
schemaInfo
Returns

◆ setSendTimeout()

ProducerConfiguration& pulsar::ProducerConfiguration::setSendTimeout ( int  sendTimeoutMs)

The getter associated with getSendTimeout()


The documentation for this class was generated from the following file: