No Matches
Public Member Functions | Friends | List of all members
pulsar::ConsumerConfiguration Class Reference

#include <ConsumerConfiguration.h>

Public Member Functions

 ConsumerConfiguration (const ConsumerConfiguration &)
ConsumerConfigurationoperator= (const ConsumerConfiguration &)
ConsumerConfiguration clone () const
ConsumerConfigurationsetSchema (const SchemaInfo &schemaInfo)
const SchemaInfogetSchema () const
ConsumerConfigurationsetConsumerType (ConsumerType consumerType)
ConsumerType getConsumerType () const
ConsumerConfigurationsetKeySharedPolicy (KeySharedPolicy keySharedPolicy)
KeySharedPolicy getKeySharedPolicy () const
ConsumerConfigurationsetMessageListener (MessageListener messageListener)
template<typename T >
ConsumerConfigurationsetTypedMessageListener (std::function< void(Consumer &, const TypedMessage< T > &)> listener, typename TypedMessage< T >::Decoder decoder)
MessageListener getMessageListener () const
bool hasMessageListener () const
ConsumerConfigurationsetConsumerEventListener (ConsumerEventListenerPtr eventListener)
ConsumerEventListenerPtr getConsumerEventListener () const
bool hasConsumerEventListener () const
void setReceiverQueueSize (int size)
int getReceiverQueueSize () const
void setMaxTotalReceiverQueueSizeAcrossPartitions (int maxTotalReceiverQueueSizeAcrossPartitions)
int getMaxTotalReceiverQueueSizeAcrossPartitions () const
void setConsumerName (const std::string &consumerName)
const std::string & getConsumerName () const
void setUnAckedMessagesTimeoutMs (const uint64_t milliSeconds)
long getUnAckedMessagesTimeoutMs () const
void setTickDurationInMs (const uint64_t milliSeconds)
long getTickDurationInMs () const
void setNegativeAckRedeliveryDelayMs (long redeliveryDelayMillis)
long getNegativeAckRedeliveryDelayMs () const
void setAckGroupingTimeMs (long ackGroupingMillis)
long getAckGroupingTimeMs () const
void setAckGroupingMaxSize (long maxGroupingSize)
long getAckGroupingMaxSize () const
void setBrokerConsumerStatsCacheTimeInMs (const long cacheTimeInMs)
long getBrokerConsumerStatsCacheTimeInMs () const
bool isEncryptionEnabled () const
const CryptoKeyReaderPtr getCryptoKeyReader () const
ConsumerConfigurationsetCryptoKeyReader (CryptoKeyReaderPtr cryptoKeyReader)
ConsumerCryptoFailureAction getCryptoFailureAction () const
ConsumerConfigurationsetCryptoFailureAction (ConsumerCryptoFailureAction action)
bool isReadCompacted () const
void setReadCompacted (bool compacted)
void setPatternAutoDiscoveryPeriod (int periodInSeconds)
int getPatternAutoDiscoveryPeriod () const
ConsumerConfigurationsetRegexSubscriptionMode (RegexSubscriptionMode regexSubscriptionMode)
RegexSubscriptionMode getRegexSubscriptionMode () const
void setSubscriptionInitialPosition (InitialPosition subscriptionInitialPosition)
InitialPosition getSubscriptionInitialPosition () const
void setBatchReceivePolicy (const BatchReceivePolicy &batchReceivePolicy)
const BatchReceivePolicygetBatchReceivePolicy () const
void setDeadLetterPolicy (const DeadLetterPolicy &deadLetterPolicy)
const DeadLetterPolicygetDeadLetterPolicy () const
void setReplicateSubscriptionStateEnabled (bool enabled)
bool isReplicateSubscriptionStateEnabled () const
bool hasProperty (const std::string &name) const
const std::string & getProperty (const std::string &name) const
std::map< std::string, std::string > & getProperties () const
ConsumerConfigurationsetProperty (const std::string &name, const std::string &value)
ConsumerConfigurationsetProperties (const std::map< std::string, std::string > &properties)
std::map< std::string, std::string > & getSubscriptionProperties () const
ConsumerConfigurationsetSubscriptionProperties (const std::map< std::string, std::string > &subscriptionProperties)
ConsumerConfigurationsetPriorityLevel (int priorityLevel)
int getPriorityLevel () const
ConsumerConfigurationsetMaxPendingChunkedMessage (size_t maxPendingChunkedMessage)
size_t getMaxPendingChunkedMessage () const
ConsumerConfigurationsetAutoAckOldestChunkedMessageOnQueueFull (bool autoAckOldestChunkedMessageOnQueueFull)
bool isAutoAckOldestChunkedMessageOnQueueFull () const
ConsumerConfigurationsetExpireTimeOfIncompleteChunkedMessageMs (long expireTimeOfIncompleteChunkedMessageMs)
long getExpireTimeOfIncompleteChunkedMessageMs () const
ConsumerConfigurationsetStartMessageIdInclusive (bool startMessageIdInclusive)
bool isStartMessageIdInclusive () const
ConsumerConfigurationsetBatchIndexAckEnabled (bool enabled)
bool isBatchIndexAckEnabled () const
ConsumerConfigurationintercept (const std::vector< ConsumerInterceptorPtr > &interceptors)
const std::vector< ConsumerInterceptorPtr > & getInterceptors () const
ConsumerConfigurationsetAckReceiptEnabled (bool ackReceiptEnabled)
bool isAckReceiptEnabled () const


class PulsarWrapper
class PulsarFriend

Detailed Description

Class specifying the configuration of a consumer.

Member Function Documentation

◆ clone()

ConsumerConfiguration pulsar::ConsumerConfiguration::clone ( ) const

Create a new instance of ConsumerConfiguration with the same initial settings as the current one.

◆ getAckGroupingMaxSize()

long pulsar::ConsumerConfiguration::getAckGroupingMaxSize ( ) const

Get max number of grouped messages within one grouping time window.

max number of grouped messages within one grouping time window.

◆ getAckGroupingTimeMs()

long pulsar::ConsumerConfiguration::getAckGroupingTimeMs ( ) const

Get grouping time window in milliseconds.

grouping time window in milliseconds.

◆ getBatchReceivePolicy()

const BatchReceivePolicy & pulsar::ConsumerConfiguration::getBatchReceivePolicy ( ) const

Get batch receive policy.

batch receive policy

◆ getBrokerConsumerStatsCacheTimeInMs()

long pulsar::ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs ( ) const
the configured timeout in milliseconds caching BrokerConsumerStats.

◆ getConsumerEventListener()

ConsumerEventListenerPtr pulsar::ConsumerConfiguration::getConsumerEventListener ( ) const
the consumer event listener

◆ getConsumerName()

const std::string & pulsar::ConsumerConfiguration::getConsumerName ( ) const
the consumer name

◆ getConsumerType()

ConsumerType pulsar::ConsumerConfiguration::getConsumerType ( ) const
the consumer type

◆ getCryptoFailureAction()

ConsumerCryptoFailureAction pulsar::ConsumerConfiguration::getCryptoFailureAction ( ) const
the ConsumerCryptoFailureAction

◆ getCryptoKeyReader()

const CryptoKeyReaderPtr pulsar::ConsumerConfiguration::getCryptoKeyReader ( ) const
the shared pointer to CryptoKeyReader.

◆ getDeadLetterPolicy()

const DeadLetterPolicy & pulsar::ConsumerConfiguration::getDeadLetterPolicy ( ) const

Get dead letter policy.

dead letter policy

◆ getExpireTimeOfIncompleteChunkedMessageMs()

long pulsar::ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs ( ) const

Get the expire time of incomplete chunked message in milliseconds

the expire time of incomplete chunked message in milliseconds

◆ getKeySharedPolicy()

KeySharedPolicy pulsar::ConsumerConfiguration::getKeySharedPolicy ( ) const
the KeyShared subscription policy

◆ getMaxPendingChunkedMessage()

size_t pulsar::ConsumerConfiguration::getMaxPendingChunkedMessage ( ) const

The associated getter of setMaxPendingChunkedMessage

◆ getMaxTotalReceiverQueueSizeAcrossPartitions()

int pulsar::ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions ( ) const
the configured max total receiver queue size across partitions

◆ getMessageListener()

MessageListener pulsar::ConsumerConfiguration::getMessageListener ( ) const
the message listener

◆ getNegativeAckRedeliveryDelayMs()

long pulsar::ConsumerConfiguration::getNegativeAckRedeliveryDelayMs ( ) const

Get the configured delay to wait before re-delivering messages that have failed to be process.

redelivery delay for failed messages

◆ getPatternAutoDiscoveryPeriod()

int pulsar::ConsumerConfiguration::getPatternAutoDiscoveryPeriod ( ) const
the time duration for the PatternMultiTopicsConsumer performs a pattern auto discovery

◆ getPriorityLevel()

int pulsar::ConsumerConfiguration::getPriorityLevel ( ) const
the configured priority for the consumer

◆ getProperties()

std::map< std::string, std::string > & pulsar::ConsumerConfiguration::getProperties ( ) const

Get all the properties attached to this producer.

◆ getProperty()

const std::string & pulsar::ConsumerConfiguration::getProperty ( const std::string &  name) const

Get the value of a specific property

namethe name of the property
the value of the property or null if the property was not defined

◆ getReceiverQueueSize()

int pulsar::ConsumerConfiguration::getReceiverQueueSize ( ) const
the receiver queue size

◆ getRegexSubscriptionMode()

RegexSubscriptionMode pulsar::ConsumerConfiguration::getRegexSubscriptionMode ( ) const
the regex subscription mode for the pattern consumer.

◆ getSchema()

const SchemaInfo & pulsar::ConsumerConfiguration::getSchema ( ) const
the schema information declared for this consumer

◆ getSubscriptionInitialPosition()

InitialPosition pulsar::ConsumerConfiguration::getSubscriptionInitialPosition ( ) const
the configured InitialPosition for the consumer

◆ getSubscriptionProperties()

std::map< std::string, std::string > & pulsar::ConsumerConfiguration::getSubscriptionProperties ( ) const

Get all the subscription properties attached to this subscription.

◆ getTickDurationInMs()

long pulsar::ConsumerConfiguration::getTickDurationInMs ( ) const
the tick duration time (in milliseconds)

◆ getUnAckedMessagesTimeoutMs()

long pulsar::ConsumerConfiguration::getUnAckedMessagesTimeoutMs ( ) const
the configured timeout in milliseconds for unacked messages.

◆ hasConsumerEventListener()

bool pulsar::ConsumerConfiguration::hasConsumerEventListener ( ) const
true if the consumer event listener has been set

◆ hasMessageListener()

bool pulsar::ConsumerConfiguration::hasMessageListener ( ) const
true if the message listener has been set

◆ hasProperty()

bool pulsar::ConsumerConfiguration::hasProperty ( const std::string &  name) const

Check whether the message has a specific property attached.

namethe name of the property to check
true if the message has the specified property
false if the property is not defined

◆ intercept()

ConsumerConfiguration & pulsar::ConsumerConfiguration::intercept ( const std::vector< ConsumerInterceptorPtr > &  interceptors)

Intercept the consumer

interceptorsthe list of interceptors to intercept the consumer
Consumer Configuration

◆ isAckReceiptEnabled()

bool pulsar::ConsumerConfiguration::isAckReceiptEnabled ( ) const

The associated getter of setAckReceiptEnabled.

◆ isAutoAckOldestChunkedMessageOnQueueFull()

bool pulsar::ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull ( ) const

The associated getter of setAutoAckOldestChunkedMessageOnQueueFull

◆ isBatchIndexAckEnabled()

bool pulsar::ConsumerConfiguration::isBatchIndexAckEnabled ( ) const

The associated getter of setBatchingEnabled

◆ isEncryptionEnabled()

bool pulsar::ConsumerConfiguration::isEncryptionEnabled ( ) const
true if encryption keys are added

◆ isReadCompacted()

bool pulsar::ConsumerConfiguration::isReadCompacted ( ) const
true if readCompacted is enabled

◆ isReplicateSubscriptionStateEnabled()

bool pulsar::ConsumerConfiguration::isReplicateSubscriptionStateEnabled ( ) const
whether the subscription status should be replicated

◆ isStartMessageIdInclusive()

bool pulsar::ConsumerConfiguration::isStartMessageIdInclusive ( ) const

The associated getter of setStartMessageIdInclusive

◆ setAckGroupingMaxSize()

void pulsar::ConsumerConfiguration::setAckGroupingMaxSize ( long  maxGroupingSize)

Set max number of grouped messages within one grouping time window. If it's set to a non-positive value, number of grouped messages is not limited. Default is 1000.

maxGroupingSizemax number of grouped messages with in one grouping time window.

◆ setAckGroupingTimeMs()

void pulsar::ConsumerConfiguration::setAckGroupingTimeMs ( long  ackGroupingMillis)

Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent to broker until the time window reaches its end, or the number of grouped messages reaches limit. Default is 100 milliseconds. If it's set to a non-positive value, ACK requests will be directly sent to broker without grouping.

ackGroupMillistime of ACK grouping window in milliseconds.

◆ setAckReceiptEnabled()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setAckReceiptEnabled ( bool  ackReceiptEnabled)

Whether to receive the ACK receipt from broker.

By default, when Consumer::acknowledge is called, it won't wait until the corresponding response from broker. After it's enabled, the acknowledge method will return a Result that indicates if the acknowledgment succeeded.

Default: false

◆ setAutoAckOldestChunkedMessageOnQueueFull()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull ( bool  autoAckOldestChunkedMessageOnQueueFull)

Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.

Default: false

autoAckOldestChunkedMessageOnQueueFullwhether to ack the discarded chunked message

◆ setBatchIndexAckEnabled()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setBatchIndexAckEnabled ( bool  enabled)

Enable the batch index acknowledgment.

It should be noted that this option can only work when the broker side also enables the batch index acknowledgment. See the acknowledgmentAtBatchIndexLevelEnabled config in broker.conf.

Default: false

enabledwhether to enable the batch index acknowledgment

◆ setBatchReceivePolicy()

void pulsar::ConsumerConfiguration::setBatchReceivePolicy ( const BatchReceivePolicy batchReceivePolicy)

Set batch receive policy.

batchReceivePolicythe default is {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}

◆ setBrokerConsumerStatsCacheTimeInMs()

void pulsar::ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs ( const long  cacheTimeInMs)

Set the time duration for which the broker side consumer stats will be cached in the client.

Default: 30000, which means 30 seconds.

cacheTimeInMsin milliseconds

◆ setConsumerEventListener()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setConsumerEventListener ( ConsumerEventListenerPtr  eventListener)

A event listener enables your application to react the consumer state change event (active or inactive).

◆ setConsumerName()

void pulsar::ConsumerConfiguration::setConsumerName ( const std::string &  consumerName)

Set the consumer name.


◆ setConsumerType()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setConsumerType ( ConsumerType  consumerType)

Specify the consumer type. The consumer type enables specifying the type of subscription. In Exclusive subscription, only a single consumer is allowed to attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers will be able to use the same subscription name and the messages will be dispatched in a round robin fashion. In Failover subscription, a primary-failover subscription model allows for multiple consumers to attach to a single subscription, though only one of them will be “master” at a given time. Only the primary consumer will receive messages. When the primary consumer gets disconnected, one among the failover consumers will be promoted to primary and will start getting messages.

◆ setCryptoFailureAction()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setCryptoFailureAction ( ConsumerCryptoFailureAction  action)

Set the ConsumerCryptoFailureAction.

◆ setCryptoKeyReader()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setCryptoKeyReader ( CryptoKeyReaderPtr  cryptoKeyReader)

Set the shared pointer to CryptoKeyReader.

theshared pointer to CryptoKeyReader

◆ setDeadLetterPolicy()

void pulsar::ConsumerConfiguration::setDeadLetterPolicy ( const DeadLetterPolicy deadLetterPolicy)

Set dead letter policy for consumer

By default, some messages are redelivered many times, even to the extent that they can never be stopped. By using the dead letter mechanism, messages have the max redelivery count, when they exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged automatically.

You can enable the dead letter mechanism by setting the dead letter policy. Example:

* DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()

Default dead letter topic name is {TopicName}-{Subscription}-DLQ. To set a custom dead letter topic name

DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
deadLetterPolicyDefault value is empty

◆ setExpireTimeOfIncompleteChunkedMessageMs()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs ( long  expireTimeOfIncompleteChunkedMessageMs)

If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if consumer won't be able to receive all chunks in expire times. Use value 0 to disable this feature.

Default: 60000, which means 1 minutes

expireTimeOfIncompleteChunkedMessageMsexpire time in milliseconds
Consumer Configuration

◆ setKeySharedPolicy()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setKeySharedPolicy ( KeySharedPolicy  keySharedPolicy)

Set KeyShared subscription policy for consumer.

By default, KeyShared subscription use auto split hash range to maintain consumers. If you want to set a different KeyShared policy, you can set by following example:

keySharedPolicyThe KeySharedPolicy want to specify

◆ setMaxPendingChunkedMessage()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setMaxPendingChunkedMessage ( size_t  maxPendingChunkedMessage)

Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publisher failed to publish all chunks of the messages.

eg: M1-C1, M2-C1, M1-C2, M2-C2 Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 belong to M2 message.

Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this maxPendingChunkedMessage threshold. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked. See setAutoAckOldestChunkedMessageOnQueueFull.

If it's zero, the pending chunked messages will not be limited.

Default: 10

maxPendingChunkedMessagethe number of max pending chunked messages

◆ setMaxTotalReceiverQueueSizeAcrossPartitions()

void pulsar::ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions ( int  maxTotalReceiverQueueSizeAcrossPartitions)

Set the max total receiver queue size across partitons.

This setting is used to reduce the receiver queue size for individual partitions setReceiverQueueSize(int) if the total exceeds this value (default: 50000).


◆ setMessageListener()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setMessageListener ( MessageListener  messageListener)

A message listener enables your application to configure how to process and acknowledge messages delivered. A listener will be called in order for every message received.

◆ setNegativeAckRedeliveryDelayMs()

void pulsar::ConsumerConfiguration::setNegativeAckRedeliveryDelayMs ( long  redeliveryDelayMillis)

Set the delay to wait before re-delivering messages that have failed to be process.

When application uses Consumer#negativeAcknowledge(Message), the failed message will be redelivered after a fixed timeout. The default is 1 min.

redeliveryDelayredelivery delay for failed messages
timeUnitunit in which the timeout is provided.
the consumer builder instance

◆ setPatternAutoDiscoveryPeriod()

void pulsar::ConsumerConfiguration::setPatternAutoDiscoveryPeriod ( int  periodInSeconds)

Set the time duration in minutes, for which the PatternMultiTopicsConsumer will do a pattern auto discovery. The default value is 60 seconds. less than 0 will disable auto discovery.

periodInSecondsperiod in seconds to do an auto discovery

◆ setPriorityLevel()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setPriorityLevel ( int  priorityLevel)

Set the Priority Level for consumer (0 is the default value and means the highest priority).

priorityLevelthe priority of this consumer
the ConsumerConfiguration instance

◆ setProperties()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setProperties ( const std::map< std::string, std::string > &  properties)

Add all the properties in the provided map

◆ setProperty()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setProperty ( const std::string &  name,
const std::string &  value 

Sets a new property on a message.

namethe name of the property
valuethe associated value

◆ setReadCompacted()

void pulsar::ConsumerConfiguration::setReadCompacted ( bool  compacted)

If enabled, the consumer reads messages from the compacted topics rather than reading the full message backlog of the topic. This means that if the topic has been compacted, the consumer only sees the latest value for each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that point, message is sent as normal.

readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (for example, failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a shared subscription leads to the subscription call failure.

readCompactedwhether to read from the compacted topic

◆ setReceiverQueueSize()

void pulsar::ConsumerConfiguration::setReceiverQueueSize ( int  size)

Sets the size of the consumer receive queue.

The consumer receive queue controls how many messages can be accumulated by the consumer before the application calls receive(). Using a higher value may potentially increase the consumer throughput at the expense of bigger memory utilization.

Setting the consumer queue size to 0 decreases the throughput of the consumer by disabling pre-fetching of messages. This approach improves the message distribution on shared subscription by pushing messages only to the consumers that are ready to process them. Neither receive with timeout nor partitioned topics can be used if the consumer queue size is 0. The receive() function call should not be interrupted when the consumer queue size is 0.

The default value is 1000 messages and it is appropriate for the most use cases.

sizethe new receiver queue size value

◆ setRegexSubscriptionMode()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setRegexSubscriptionMode ( RegexSubscriptionMode  regexSubscriptionMode)

Determines which topics this consumer should be subscribed to - Persistent, Non-Persistent, or AllTopics. Only used with pattern subscriptions.

regexSubscriptionModeThe default value is PersistentOnly.

◆ setReplicateSubscriptionStateEnabled()

void pulsar::ConsumerConfiguration::setReplicateSubscriptionStateEnabled ( bool  enabled)

Set whether the subscription status should be replicated. The default value is false.

replicateSubscriptionStatewhether the subscription status should be replicated

◆ setSchema()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setSchema ( const SchemaInfo schemaInfo)

Declare the schema of the data that this consumer will be accepting.

The schema will be checked against the schema of the topic, and the consumer creation will fail if it's not compatible.

schemaInfothe schema definition object

◆ setStartMessageIdInclusive()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setStartMessageIdInclusive ( bool  startMessageIdInclusive)

Set the consumer to include the given position of any reset operation like Consumer::seek.

Default: false

startMessageIdInclusivewhether to include the reset position

◆ setSubscriptionInitialPosition()

void pulsar::ConsumerConfiguration::setSubscriptionInitialPosition ( InitialPosition  subscriptionInitialPosition)

The default value is InitialPositionLatest.

subscriptionInitialPositionthe initial position at which to set the cursor when subscribing to the topic for the first time

◆ setSubscriptionProperties()

ConsumerConfiguration & pulsar::ConsumerConfiguration::setSubscriptionProperties ( const std::map< std::string, std::string > &  subscriptionProperties)

Sets a new subscription properties for this subscription. Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to create a subscription if they use different properties.

subscriptionPropertiesall the subscription properties in the provided map

◆ setTickDurationInMs()

void pulsar::ConsumerConfiguration::setTickDurationInMs ( const uint64_t  milliSeconds)

Set the tick duration time that defines the granularity of the ack-timeout redelivery (in milliseconds).

The default value is 1000, which means 1 second.

Using a higher tick time reduces the memory overhead to track messages when the ack-timeout is set to a bigger value.

milliSecondsthe tick duration time (in milliseconds)

◆ setUnAckedMessagesTimeoutMs()

void pulsar::ConsumerConfiguration::setUnAckedMessagesTimeoutMs ( const uint64_t  milliSeconds)

Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds). If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are redelivered.

Default: 0, which means the the tracker for unacknowledged messages is disabled.

timeoutin milliseconds

The documentation for this class was generated from the following file: