pulsar-client-cpp
Public Member Functions | Friends | List of all members
pulsar::ConsumerConfiguration Class Reference

#include <ConsumerConfiguration.h>

Public Member Functions

 ConsumerConfiguration (const ConsumerConfiguration &)
 
ConsumerConfigurationoperator= (const ConsumerConfiguration &)
 
ConsumerConfiguration clone () const
 
ConsumerConfigurationsetSchema (const SchemaInfo &schemaInfo)
 
const SchemaInfogetSchema () const
 
ConsumerConfigurationsetConsumerType (ConsumerType consumerType)
 
ConsumerType getConsumerType () const
 
ConsumerConfigurationsetKeySharedPolicy (KeySharedPolicy keySharedPolicy)
 
KeySharedPolicy getKeySharedPolicy () const
 
ConsumerConfigurationsetMessageListener (MessageListener messageListener)
 
template<typename T >
ConsumerConfigurationsetTypedMessageListener (std::function< void(Consumer &, const TypedMessage< T > &)> listener, typename TypedMessage< T >::Decoder decoder)
 
MessageListener getMessageListener () const
 
bool hasMessageListener () const
 
ConsumerConfigurationsetConsumerEventListener (ConsumerEventListenerPtr eventListener)
 
ConsumerEventListenerPtr getConsumerEventListener () const
 
bool hasConsumerEventListener () const
 
void setReceiverQueueSize (int size)
 
int getReceiverQueueSize () const
 
void setMaxTotalReceiverQueueSizeAcrossPartitions (int maxTotalReceiverQueueSizeAcrossPartitions)
 
int getMaxTotalReceiverQueueSizeAcrossPartitions () const
 
void setConsumerName (const std::string &consumerName)
 
const std::string & getConsumerName () const
 
void setUnAckedMessagesTimeoutMs (const uint64_t milliSeconds)
 
long getUnAckedMessagesTimeoutMs () const
 
void setTickDurationInMs (const uint64_t milliSeconds)
 
long getTickDurationInMs () const
 
void setNegativeAckRedeliveryDelayMs (long redeliveryDelayMillis)
 
long getNegativeAckRedeliveryDelayMs () const
 
void setAckGroupingTimeMs (long ackGroupingMillis)
 
long getAckGroupingTimeMs () const
 
void setAckGroupingMaxSize (long maxGroupingSize)
 
long getAckGroupingMaxSize () const
 
void setBrokerConsumerStatsCacheTimeInMs (const long cacheTimeInMs)
 
long getBrokerConsumerStatsCacheTimeInMs () const
 
bool isEncryptionEnabled () const
 
const CryptoKeyReaderPtr getCryptoKeyReader () const
 
ConsumerConfigurationsetCryptoKeyReader (CryptoKeyReaderPtr cryptoKeyReader)
 
ConsumerCryptoFailureAction getCryptoFailureAction () const
 
ConsumerConfigurationsetCryptoFailureAction (ConsumerCryptoFailureAction action)
 
bool isReadCompacted () const
 
void setReadCompacted (bool compacted)
 
void setPatternAutoDiscoveryPeriod (int periodInSeconds)
 
int getPatternAutoDiscoveryPeriod () const
 
ConsumerConfigurationsetRegexSubscriptionMode (RegexSubscriptionMode regexSubscriptionMode)
 
RegexSubscriptionMode getRegexSubscriptionMode () const
 
void setSubscriptionInitialPosition (InitialPosition subscriptionInitialPosition)
 
InitialPosition getSubscriptionInitialPosition () const
 
void setBatchReceivePolicy (const BatchReceivePolicy &batchReceivePolicy)
 
const BatchReceivePolicygetBatchReceivePolicy () const
 
void setDeadLetterPolicy (const DeadLetterPolicy &deadLetterPolicy)
 
const DeadLetterPolicygetDeadLetterPolicy () const
 
void setReplicateSubscriptionStateEnabled (bool enabled)
 
bool isReplicateSubscriptionStateEnabled () const
 
bool hasProperty (const std::string &name) const
 
const std::string & getProperty (const std::string &name) const
 
std::map< std::string, std::string > & getProperties () const
 
ConsumerConfigurationsetProperty (const std::string &name, const std::string &value)
 
ConsumerConfigurationsetProperties (const std::map< std::string, std::string > &properties)
 
std::map< std::string, std::string > & getSubscriptionProperties () const
 
ConsumerConfigurationsetSubscriptionProperties (const std::map< std::string, std::string > &subscriptionProperties)
 
ConsumerConfigurationsetPriorityLevel (int priorityLevel)
 
int getPriorityLevel () const
 
ConsumerConfigurationsetMaxPendingChunkedMessage (size_t maxPendingChunkedMessage)
 
size_t getMaxPendingChunkedMessage () const
 
ConsumerConfigurationsetAutoAckOldestChunkedMessageOnQueueFull (bool autoAckOldestChunkedMessageOnQueueFull)
 
bool isAutoAckOldestChunkedMessageOnQueueFull () const
 
ConsumerConfigurationsetExpireTimeOfIncompleteChunkedMessageMs (long expireTimeOfIncompleteChunkedMessageMs)
 
long getExpireTimeOfIncompleteChunkedMessageMs () const
 
ConsumerConfigurationsetStartMessageIdInclusive (bool startMessageIdInclusive)
 
bool isStartMessageIdInclusive () const
 
ConsumerConfigurationsetBatchIndexAckEnabled (bool enabled)
 
bool isBatchIndexAckEnabled () const
 
ConsumerConfigurationintercept (const std::vector< ConsumerInterceptorPtr > &interceptors)
 
const std::vector< ConsumerInterceptorPtr > & getInterceptors () const
 
ConsumerConfigurationsetAckReceiptEnabled (bool ackReceiptEnabled)
 
bool isAckReceiptEnabled () const
 

Friends

class PulsarWrapper
 
class PulsarFriend
 

Detailed Description

Class specifying the configuration of a consumer.

Member Function Documentation

◆ clone()

ConsumerConfiguration pulsar::ConsumerConfiguration::clone ( ) const

Create a new instance of ConsumerConfiguration with the same initial settings as the current one.

◆ getAckGroupingMaxSize()

long pulsar::ConsumerConfiguration::getAckGroupingMaxSize ( ) const

Get max number of grouped messages within one grouping time window.

Returns
max number of grouped messages within one grouping time window.

◆ getAckGroupingTimeMs()

long pulsar::ConsumerConfiguration::getAckGroupingTimeMs ( ) const

Get grouping time window in milliseconds.

Returns
grouping time window in milliseconds.

◆ getBatchReceivePolicy()

const BatchReceivePolicy& pulsar::ConsumerConfiguration::getBatchReceivePolicy ( ) const

Get batch receive policy.

Returns
batch receive policy

◆ getBrokerConsumerStatsCacheTimeInMs()

long pulsar::ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs ( ) const
Returns
the configured timeout in milliseconds caching BrokerConsumerStats.

◆ getConsumerEventListener()

ConsumerEventListenerPtr pulsar::ConsumerConfiguration::getConsumerEventListener ( ) const
Returns
the consumer event listener

◆ getConsumerName()

const std::string& pulsar::ConsumerConfiguration::getConsumerName ( ) const
Returns
the consumer name

◆ getConsumerType()

ConsumerType pulsar::ConsumerConfiguration::getConsumerType ( ) const
Returns
the consumer type

◆ getCryptoFailureAction()

ConsumerCryptoFailureAction pulsar::ConsumerConfiguration::getCryptoFailureAction ( ) const
Returns
the ConsumerCryptoFailureAction

◆ getCryptoKeyReader()

const CryptoKeyReaderPtr pulsar::ConsumerConfiguration::getCryptoKeyReader ( ) const
Returns
the shared pointer to CryptoKeyReader.

◆ getDeadLetterPolicy()

const DeadLetterPolicy& pulsar::ConsumerConfiguration::getDeadLetterPolicy ( ) const

Get dead letter policy.

Returns
dead letter policy

◆ getExpireTimeOfIncompleteChunkedMessageMs()

long pulsar::ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs ( ) const

Get the expire time of incomplete chunked message in milliseconds

Returns
the expire time of incomplete chunked message in milliseconds

◆ getKeySharedPolicy()

KeySharedPolicy pulsar::ConsumerConfiguration::getKeySharedPolicy ( ) const
Returns
the KeyShared subscription policy

◆ getMaxPendingChunkedMessage()

size_t pulsar::ConsumerConfiguration::getMaxPendingChunkedMessage ( ) const

The associated getter of setMaxPendingChunkedMessage

◆ getMaxTotalReceiverQueueSizeAcrossPartitions()

int pulsar::ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions ( ) const
Returns
the configured max total receiver queue size across partitions

◆ getMessageListener()

MessageListener pulsar::ConsumerConfiguration::getMessageListener ( ) const
Returns
the message listener

◆ getNegativeAckRedeliveryDelayMs()

long pulsar::ConsumerConfiguration::getNegativeAckRedeliveryDelayMs ( ) const

Get the configured delay to wait before re-delivering messages that have failed to be process.

Returns
redelivery delay for failed messages

◆ getPatternAutoDiscoveryPeriod()

int pulsar::ConsumerConfiguration::getPatternAutoDiscoveryPeriod ( ) const
Returns
the time duration for the PatternMultiTopicsConsumer performs a pattern auto discovery

◆ getPriorityLevel()

int pulsar::ConsumerConfiguration::getPriorityLevel ( ) const
Returns
the configured priority for the consumer

◆ getProperties()

std::map<std::string, std::string>& pulsar::ConsumerConfiguration::getProperties ( ) const

Get all the properties attached to this producer.

◆ getProperty()

const std::string& pulsar::ConsumerConfiguration::getProperty ( const std::string &  name) const

Get the value of a specific property

Parameters
namethe name of the property
Returns
the value of the property or null if the property was not defined

◆ getReceiverQueueSize()

int pulsar::ConsumerConfiguration::getReceiverQueueSize ( ) const
Returns
the receiver queue size

◆ getRegexSubscriptionMode()

RegexSubscriptionMode pulsar::ConsumerConfiguration::getRegexSubscriptionMode ( ) const
Returns
the regex subscription mode for the pattern consumer.

◆ getSchema()

const SchemaInfo& pulsar::ConsumerConfiguration::getSchema ( ) const
Returns
the schema information declared for this consumer

◆ getSubscriptionInitialPosition()

InitialPosition pulsar::ConsumerConfiguration::getSubscriptionInitialPosition ( ) const
Returns
the configured InitialPosition for the consumer

◆ getSubscriptionProperties()

std::map<std::string, std::string>& pulsar::ConsumerConfiguration::getSubscriptionProperties ( ) const

Get all the subscription properties attached to this subscription.

◆ getTickDurationInMs()

long pulsar::ConsumerConfiguration::getTickDurationInMs ( ) const
Returns
the tick duration time (in milliseconds)

◆ getUnAckedMessagesTimeoutMs()

long pulsar::ConsumerConfiguration::getUnAckedMessagesTimeoutMs ( ) const
Returns
the configured timeout in milliseconds for unacked messages.

◆ hasConsumerEventListener()

bool pulsar::ConsumerConfiguration::hasConsumerEventListener ( ) const
Returns
true if the consumer event listener has been set

◆ hasMessageListener()

bool pulsar::ConsumerConfiguration::hasMessageListener ( ) const
Returns
true if the message listener has been set

◆ hasProperty()

bool pulsar::ConsumerConfiguration::hasProperty ( const std::string &  name) const

Check whether the message has a specific property attached.

Parameters
namethe name of the property to check
Returns
true if the message has the specified property
false if the property is not defined

◆ intercept()

ConsumerConfiguration& pulsar::ConsumerConfiguration::intercept ( const std::vector< ConsumerInterceptorPtr > &  interceptors)

Intercept the consumer

Parameters
interceptorsthe list of interceptors to intercept the consumer
Returns
Consumer Configuration

◆ isAckReceiptEnabled()

bool pulsar::ConsumerConfiguration::isAckReceiptEnabled ( ) const

The associated getter of setAckReceiptEnabled.

◆ isAutoAckOldestChunkedMessageOnQueueFull()

bool pulsar::ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull ( ) const

The associated getter of setAutoAckOldestChunkedMessageOnQueueFull

◆ isBatchIndexAckEnabled()

bool pulsar::ConsumerConfiguration::isBatchIndexAckEnabled ( ) const

The associated getter of setBatchingEnabled

◆ isEncryptionEnabled()

bool pulsar::ConsumerConfiguration::isEncryptionEnabled ( ) const
Returns
true if encryption keys are added

◆ isReadCompacted()

bool pulsar::ConsumerConfiguration::isReadCompacted ( ) const
Returns
true if readCompacted is enabled

◆ isReplicateSubscriptionStateEnabled()

bool pulsar::ConsumerConfiguration::isReplicateSubscriptionStateEnabled ( ) const
Returns
whether the subscription status should be replicated

◆ isStartMessageIdInclusive()

bool pulsar::ConsumerConfiguration::isStartMessageIdInclusive ( ) const

The associated getter of setStartMessageIdInclusive

◆ setAckGroupingMaxSize()

void pulsar::ConsumerConfiguration::setAckGroupingMaxSize ( long  maxGroupingSize)

Set max number of grouped messages within one grouping time window. If it's set to a non-positive value, number of grouped messages is not limited. Default is 1000.

Parameters
maxGroupingSizemax number of grouped messages with in one grouping time window.

◆ setAckGroupingTimeMs()

void pulsar::ConsumerConfiguration::setAckGroupingTimeMs ( long  ackGroupingMillis)

Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent to broker until the time window reaches its end, or the number of grouped messages reaches limit. Default is 100 milliseconds. If it's set to a non-positive value, ACK requests will be directly sent to broker without grouping.

Parameters
ackGroupMillistime of ACK grouping window in milliseconds.

◆ setAckReceiptEnabled()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setAckReceiptEnabled ( bool  ackReceiptEnabled)

Whether to receive the ACK receipt from broker.

By default, when Consumer::acknowledge is called, it won't wait until the corresponding response from broker. After it's enabled, the acknowledge method will return a Result that indicates if the acknowledgment succeeded.

Default: false

◆ setAutoAckOldestChunkedMessageOnQueueFull()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull ( bool  autoAckOldestChunkedMessageOnQueueFull)

Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.

Default: false

Parameters
autoAckOldestChunkedMessageOnQueueFullwhether to ack the discarded chunked message

◆ setBatchIndexAckEnabled()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setBatchIndexAckEnabled ( bool  enabled)

Enable the batch index acknowledgment.

It should be noted that this option can only work when the broker side also enables the batch index acknowledgment. See the acknowledgmentAtBatchIndexLevelEnabled config in broker.conf.

Default: false

Parameters
enabledwhether to enable the batch index acknowledgment

◆ setBatchReceivePolicy()

void pulsar::ConsumerConfiguration::setBatchReceivePolicy ( const BatchReceivePolicy batchReceivePolicy)

Set batch receive policy.

Parameters
batchReceivePolicythe default is {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}

◆ setBrokerConsumerStatsCacheTimeInMs()

void pulsar::ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs ( const long  cacheTimeInMs)

Set the time duration for which the broker side consumer stats will be cached in the client.

Default: 30000, which means 30 seconds.

Parameters
cacheTimeInMsin milliseconds

◆ setConsumerEventListener()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setConsumerEventListener ( ConsumerEventListenerPtr  eventListener)

A event listener enables your application to react the consumer state change event (active or inactive).

◆ setConsumerName()

void pulsar::ConsumerConfiguration::setConsumerName ( const std::string &  consumerName)

Set the consumer name.

Parameters
consumerName

◆ setConsumerType()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setConsumerType ( ConsumerType  consumerType)

Specify the consumer type. The consumer type enables specifying the type of subscription. In Exclusive subscription, only a single consumer is allowed to attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers will be able to use the same subscription name and the messages will be dispatched in a round robin fashion. In Failover subscription, a primary-failover subscription model allows for multiple consumers to attach to a single subscription, though only one of them will be “master” at a given time. Only the primary consumer will receive messages. When the primary consumer gets disconnected, one among the failover consumers will be promoted to primary and will start getting messages.

◆ setCryptoFailureAction()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setCryptoFailureAction ( ConsumerCryptoFailureAction  action)

Set the ConsumerCryptoFailureAction.

◆ setCryptoKeyReader()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setCryptoKeyReader ( CryptoKeyReaderPtr  cryptoKeyReader)

Set the shared pointer to CryptoKeyReader.

Parameters
theshared pointer to CryptoKeyReader

◆ setDeadLetterPolicy()

void pulsar::ConsumerConfiguration::setDeadLetterPolicy ( const DeadLetterPolicy deadLetterPolicy)

Set dead letter policy for consumer

By default, some messages are redelivered many times, even to the extent that they can never be stopped. By using the dead letter mechanism, messages have the max redelivery count, when they exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged automatically.

You can enable the dead letter mechanism by setting the dead letter policy. Example:

* DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
                      .maxRedeliverCount(10)
                      .build();

Default dead letter topic name is {TopicName}-{Subscription}-DLQ. To set a custom dead letter topic name

DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
                      .deadLetterTopic("dlq-topic")
                      .maxRedeliverCount(10)
                      .initialSubscriptionName("init-sub-name")
                      .build();
Parameters
deadLetterPolicyDefault value is empty

◆ setExpireTimeOfIncompleteChunkedMessageMs()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs ( long  expireTimeOfIncompleteChunkedMessageMs)

If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if consumer won't be able to receive all chunks in expire times. Use value 0 to disable this feature.

Default: 60000, which means 1 minutes

Parameters
expireTimeOfIncompleteChunkedMessageMsexpire time in milliseconds
Returns
Consumer Configuration

◆ setKeySharedPolicy()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setKeySharedPolicy ( KeySharedPolicy  keySharedPolicy)

Set KeyShared subscription policy for consumer.

By default, KeyShared subscription use auto split hash range to maintain consumers. If you want to set a different KeyShared policy, you can set by following example:

Parameters
keySharedPolicyThe KeySharedPolicy want to specify

◆ setMaxPendingChunkedMessage()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setMaxPendingChunkedMessage ( size_t  maxPendingChunkedMessage)

Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publisher failed to publish all chunks of the messages.

eg: M1-C1, M2-C1, M1-C2, M2-C2 Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 belong to M2 message.

Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this maxPendingChunkedMessage threshold. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked. See setAutoAckOldestChunkedMessageOnQueueFull.

If it's zero, the pending chunked messages will not be limited.

Default: 10

Parameters
maxPendingChunkedMessagethe number of max pending chunked messages

◆ setMaxTotalReceiverQueueSizeAcrossPartitions()

void pulsar::ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions ( int  maxTotalReceiverQueueSizeAcrossPartitions)

Set the max total receiver queue size across partitons.

This setting is used to reduce the receiver queue size for individual partitions setReceiverQueueSize(int) if the total exceeds this value (default: 50000).

Parameters
maxTotalReceiverQueueSizeAcrossPartitions

◆ setMessageListener()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setMessageListener ( MessageListener  messageListener)

A message listener enables your application to configure how to process and acknowledge messages delivered. A listener will be called in order for every message received.

◆ setNegativeAckRedeliveryDelayMs()

void pulsar::ConsumerConfiguration::setNegativeAckRedeliveryDelayMs ( long  redeliveryDelayMillis)

Set the delay to wait before re-delivering messages that have failed to be process.

When application uses Consumer#negativeAcknowledge(Message), the failed message will be redelivered after a fixed timeout. The default is 1 min.

Parameters
redeliveryDelayredelivery delay for failed messages
timeUnitunit in which the timeout is provided.
Returns
the consumer builder instance

◆ setPatternAutoDiscoveryPeriod()

void pulsar::ConsumerConfiguration::setPatternAutoDiscoveryPeriod ( int  periodInSeconds)

Set the time duration in minutes, for which the PatternMultiTopicsConsumer will do a pattern auto discovery. The default value is 60 seconds. less than 0 will disable auto discovery.

Parameters
periodInSecondsperiod in seconds to do an auto discovery

◆ setPriorityLevel()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setPriorityLevel ( int  priorityLevel)

Set the Priority Level for consumer (0 is the default value and means the highest priority).

Parameters
priorityLevelthe priority of this consumer
Returns
the ConsumerConfiguration instance

◆ setProperties()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setProperties ( const std::map< std::string, std::string > &  properties)

Add all the properties in the provided map

◆ setProperty()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setProperty ( const std::string &  name,
const std::string &  value 
)

Sets a new property on a message.

Parameters
namethe name of the property
valuethe associated value

◆ setReadCompacted()

void pulsar::ConsumerConfiguration::setReadCompacted ( bool  compacted)

If enabled, the consumer reads messages from the compacted topics rather than reading the full message backlog of the topic. This means that if the topic has been compacted, the consumer only sees the latest value for each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that point, message is sent as normal.

readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (for example, failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a shared subscription leads to the subscription call failure.

Parameters
readCompactedwhether to read from the compacted topic

◆ setReceiverQueueSize()

void pulsar::ConsumerConfiguration::setReceiverQueueSize ( int  size)

Sets the size of the consumer receive queue.

The consumer receive queue controls how many messages can be accumulated by the consumer before the application calls receive(). Using a higher value may potentially increase the consumer throughput at the expense of bigger memory utilization.

Setting the consumer queue size to 0 decreases the throughput of the consumer by disabling pre-fetching of messages. This approach improves the message distribution on shared subscription by pushing messages only to the consumers that are ready to process them. Neither receive with timeout nor partitioned topics can be used if the consumer queue size is 0. The receive() function call should not be interrupted when the consumer queue size is 0.

The default value is 1000 messages and it is appropriate for the most use cases.

Parameters
sizethe new receiver queue size value

◆ setRegexSubscriptionMode()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setRegexSubscriptionMode ( RegexSubscriptionMode  regexSubscriptionMode)

Determines which topics this consumer should be subscribed to - Persistent, Non-Persistent, or AllTopics. Only used with pattern subscriptions.

Parameters
regexSubscriptionModeThe default value is PersistentOnly.

◆ setReplicateSubscriptionStateEnabled()

void pulsar::ConsumerConfiguration::setReplicateSubscriptionStateEnabled ( bool  enabled)

Set whether the subscription status should be replicated. The default value is false.

Parameters
replicateSubscriptionStatewhether the subscription status should be replicated

◆ setSchema()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setSchema ( const SchemaInfo schemaInfo)

Declare the schema of the data that this consumer will be accepting.

The schema will be checked against the schema of the topic, and the consumer creation will fail if it's not compatible.

Parameters
schemaInfothe schema definition object

◆ setStartMessageIdInclusive()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setStartMessageIdInclusive ( bool  startMessageIdInclusive)

Set the consumer to include the given position of any reset operation like Consumer::seek.

Default: false

Parameters
startMessageIdInclusivewhether to include the reset position

◆ setSubscriptionInitialPosition()

void pulsar::ConsumerConfiguration::setSubscriptionInitialPosition ( InitialPosition  subscriptionInitialPosition)

The default value is InitialPositionLatest.

Parameters
subscriptionInitialPositionthe initial position at which to set the cursor when subscribing to the topic for the first time

◆ setSubscriptionProperties()

ConsumerConfiguration& pulsar::ConsumerConfiguration::setSubscriptionProperties ( const std::map< std::string, std::string > &  subscriptionProperties)

Sets a new subscription properties for this subscription. Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to create a subscription if they use different properties.

Parameters
subscriptionPropertiesall the subscription properties in the provided map

◆ setTickDurationInMs()

void pulsar::ConsumerConfiguration::setTickDurationInMs ( const uint64_t  milliSeconds)

Set the tick duration time that defines the granularity of the ack-timeout redelivery (in milliseconds).

The default value is 1000, which means 1 second.

Using a higher tick time reduces the memory overhead to track messages when the ack-timeout is set to a bigger value.

Parameters
milliSecondsthe tick duration time (in milliseconds)

◆ setUnAckedMessagesTimeoutMs()

void pulsar::ConsumerConfiguration::setUnAckedMessagesTimeoutMs ( const uint64_t  milliSeconds)

Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds). If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are redelivered.

Default: 0, which means the the tracker for unacknowledged messages is disabled.

Parameters
timeoutin milliseconds

The documentation for this class was generated from the following file: