#include <ConsumerConfiguration.h>
Friends | |
class | PulsarWrapper |
class | PulsarFriend |
Class specifying the configuration of a consumer.
ConsumerConfiguration pulsar::ConsumerConfiguration::clone | ( | ) | const |
Create a new instance of ConsumerConfiguration with the same initial settings as the current one.
long pulsar::ConsumerConfiguration::getAckGroupingMaxSize | ( | ) | const |
Get max number of grouped messages within one grouping time window.
long pulsar::ConsumerConfiguration::getAckGroupingTimeMs | ( | ) | const |
Get grouping time window in milliseconds.
const BatchReceivePolicy & pulsar::ConsumerConfiguration::getBatchReceivePolicy | ( | ) | const |
Get batch receive policy.
long pulsar::ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs | ( | ) | const |
ConsumerEventListenerPtr pulsar::ConsumerConfiguration::getConsumerEventListener | ( | ) | const |
const std::string & pulsar::ConsumerConfiguration::getConsumerName | ( | ) | const |
ConsumerType pulsar::ConsumerConfiguration::getConsumerType | ( | ) | const |
ConsumerCryptoFailureAction pulsar::ConsumerConfiguration::getCryptoFailureAction | ( | ) | const |
const CryptoKeyReaderPtr pulsar::ConsumerConfiguration::getCryptoKeyReader | ( | ) | const |
const DeadLetterPolicy & pulsar::ConsumerConfiguration::getDeadLetterPolicy | ( | ) | const |
Get dead letter policy.
long pulsar::ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs | ( | ) | const |
Get the expire time of incomplete chunked message in milliseconds
KeySharedPolicy pulsar::ConsumerConfiguration::getKeySharedPolicy | ( | ) | const |
size_t pulsar::ConsumerConfiguration::getMaxPendingChunkedMessage | ( | ) | const |
The associated getter of setMaxPendingChunkedMessage
int pulsar::ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions | ( | ) | const |
MessageListener pulsar::ConsumerConfiguration::getMessageListener | ( | ) | const |
long pulsar::ConsumerConfiguration::getNegativeAckRedeliveryDelayMs | ( | ) | const |
Get the configured delay to wait before re-delivering messages that have failed to be process.
int pulsar::ConsumerConfiguration::getPatternAutoDiscoveryPeriod | ( | ) | const |
int pulsar::ConsumerConfiguration::getPriorityLevel | ( | ) | const |
std::map< std::string, std::string > & pulsar::ConsumerConfiguration::getProperties | ( | ) | const |
Get all the properties attached to this producer.
const std::string & pulsar::ConsumerConfiguration::getProperty | ( | const std::string & | name | ) | const |
Get the value of a specific property
name | the name of the property |
int pulsar::ConsumerConfiguration::getReceiverQueueSize | ( | ) | const |
RegexSubscriptionMode pulsar::ConsumerConfiguration::getRegexSubscriptionMode | ( | ) | const |
const SchemaInfo & pulsar::ConsumerConfiguration::getSchema | ( | ) | const |
InitialPosition pulsar::ConsumerConfiguration::getSubscriptionInitialPosition | ( | ) | const |
InitialPosition
for the consumer std::map< std::string, std::string > & pulsar::ConsumerConfiguration::getSubscriptionProperties | ( | ) | const |
Get all the subscription properties attached to this subscription.
long pulsar::ConsumerConfiguration::getTickDurationInMs | ( | ) | const |
long pulsar::ConsumerConfiguration::getUnAckedMessagesTimeoutMs | ( | ) | const |
bool pulsar::ConsumerConfiguration::hasConsumerEventListener | ( | ) | const |
bool pulsar::ConsumerConfiguration::hasMessageListener | ( | ) | const |
bool pulsar::ConsumerConfiguration::hasProperty | ( | const std::string & | name | ) | const |
Check whether the message has a specific property attached.
name | the name of the property to check |
ConsumerConfiguration & pulsar::ConsumerConfiguration::intercept | ( | const std::vector< ConsumerInterceptorPtr > & | interceptors | ) |
Intercept the consumer
interceptors | the list of interceptors to intercept the consumer |
bool pulsar::ConsumerConfiguration::isAckReceiptEnabled | ( | ) | const |
The associated getter of setAckReceiptEnabled.
bool pulsar::ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull | ( | ) | const |
The associated getter of setAutoAckOldestChunkedMessageOnQueueFull
bool pulsar::ConsumerConfiguration::isBatchIndexAckEnabled | ( | ) | const |
The associated getter of setBatchingEnabled
bool pulsar::ConsumerConfiguration::isEncryptionEnabled | ( | ) | const |
bool pulsar::ConsumerConfiguration::isReadCompacted | ( | ) | const |
bool pulsar::ConsumerConfiguration::isReplicateSubscriptionStateEnabled | ( | ) | const |
bool pulsar::ConsumerConfiguration::isStartMessageIdInclusive | ( | ) | const |
The associated getter of setStartMessageIdInclusive
void pulsar::ConsumerConfiguration::setAckGroupingMaxSize | ( | long | maxGroupingSize | ) |
Set max number of grouped messages within one grouping time window. If it's set to a non-positive value, number of grouped messages is not limited. Default is 1000.
maxGroupingSize | max number of grouped messages with in one grouping time window. |
void pulsar::ConsumerConfiguration::setAckGroupingTimeMs | ( | long | ackGroupingMillis | ) |
Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent to broker until the time window reaches its end, or the number of grouped messages reaches limit. Default is 100 milliseconds. If it's set to a non-positive value, ACK requests will be directly sent to broker without grouping.
ackGroupMillis | time of ACK grouping window in milliseconds. |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setAckReceiptEnabled | ( | bool | ackReceiptEnabled | ) |
Whether to receive the ACK receipt from broker.
By default, when Consumer::acknowledge is called, it won't wait until the corresponding response from broker. After it's enabled, the acknowledge
method will return a Result that indicates if the acknowledgment succeeded.
Default: false
ConsumerConfiguration & pulsar::ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull | ( | bool | autoAckOldestChunkedMessageOnQueueFull | ) |
Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
Default: false
autoAckOldestChunkedMessageOnQueueFull | whether to ack the discarded chunked message |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setBatchIndexAckEnabled | ( | bool | enabled | ) |
Enable the batch index acknowledgment.
It should be noted that this option can only work when the broker side also enables the batch index acknowledgment. See the acknowledgmentAtBatchIndexLevelEnabled
config in broker.conf
.
Default: false
enabled | whether to enable the batch index acknowledgment |
void pulsar::ConsumerConfiguration::setBatchReceivePolicy | ( | const BatchReceivePolicy & | batchReceivePolicy | ) |
Set batch receive policy.
batchReceivePolicy | the default is {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100} |
void pulsar::ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs | ( | const long | cacheTimeInMs | ) |
Set the time duration for which the broker side consumer stats will be cached in the client.
Default: 30000, which means 30 seconds.
cacheTimeInMs | in milliseconds |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setConsumerEventListener | ( | ConsumerEventListenerPtr | eventListener | ) |
A event listener enables your application to react the consumer state change event (active or inactive).
void pulsar::ConsumerConfiguration::setConsumerName | ( | const std::string & | consumerName | ) |
Set the consumer name.
consumerName |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setConsumerType | ( | ConsumerType | consumerType | ) |
Specify the consumer type. The consumer type enables specifying the type of subscription. In Exclusive subscription, only a single consumer is allowed to attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers will be able to use the same subscription name and the messages will be dispatched in a round robin fashion. In Failover subscription, a primary-failover subscription model allows for multiple consumers to attach to a single subscription, though only one of them will be “master” at a given time. Only the primary consumer will receive messages. When the primary consumer gets disconnected, one among the failover consumers will be promoted to primary and will start getting messages.
ConsumerConfiguration & pulsar::ConsumerConfiguration::setCryptoFailureAction | ( | ConsumerCryptoFailureAction | action | ) |
Set the ConsumerCryptoFailureAction.
ConsumerConfiguration & pulsar::ConsumerConfiguration::setCryptoKeyReader | ( | CryptoKeyReaderPtr | cryptoKeyReader | ) |
Set the shared pointer to CryptoKeyReader.
the | shared pointer to CryptoKeyReader |
void pulsar::ConsumerConfiguration::setDeadLetterPolicy | ( | const DeadLetterPolicy & | deadLetterPolicy | ) |
Set dead letter policy for consumer
By default, some messages are redelivered many times, even to the extent that they can never be stopped. By using the dead letter mechanism, messages have the max redelivery count, when they exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged automatically.
You can enable the dead letter mechanism by setting the dead letter policy. Example:
* DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder() .maxRedeliverCount(10) .build();
Default dead letter topic name is {TopicName}-{Subscription}-DLQ. To set a custom dead letter topic name
DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder() .deadLetterTopic("dlq-topic") .maxRedeliverCount(10) .initialSubscriptionName("init-sub-name") .build();
deadLetterPolicy | Default value is empty |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs | ( | long | expireTimeOfIncompleteChunkedMessageMs | ) |
If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if consumer won't be able to receive all chunks in expire times. Use value 0 to disable this feature.
Default: 60000, which means 1 minutes
expireTimeOfIncompleteChunkedMessageMs | expire time in milliseconds |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setKeySharedPolicy | ( | KeySharedPolicy | keySharedPolicy | ) |
Set KeyShared subscription policy for consumer.
By default, KeyShared subscription use auto split hash range to maintain consumers. If you want to set a different KeyShared policy, you can set by following example:
keySharedPolicy | The KeySharedPolicy want to specify |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setMaxPendingChunkedMessage | ( | size_t | maxPendingChunkedMessage | ) |
Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publisher failed to publish all chunks of the messages.
eg: M1-C1, M2-C1, M1-C2, M2-C2 Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 belong to M2 message.
Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be guarded by providing this maxPendingChunkedMessage threshold. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked. See setAutoAckOldestChunkedMessageOnQueueFull.
If it's zero, the pending chunked messages will not be limited.
Default: 10
maxPendingChunkedMessage | the number of max pending chunked messages |
void pulsar::ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions | ( | int | maxTotalReceiverQueueSizeAcrossPartitions | ) |
Set the max total receiver queue size across partitons.
This setting is used to reduce the receiver queue size for individual partitions setReceiverQueueSize(int)
if the total exceeds this value (default: 50000).
maxTotalReceiverQueueSizeAcrossPartitions |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setMessageListener | ( | MessageListener | messageListener | ) |
A message listener enables your application to configure how to process and acknowledge messages delivered. A listener will be called in order for every message received.
void pulsar::ConsumerConfiguration::setNegativeAckRedeliveryDelayMs | ( | long | redeliveryDelayMillis | ) |
Set the delay to wait before re-delivering messages that have failed to be process.
When application uses Consumer#negativeAcknowledge(Message)
, the failed message will be redelivered after a fixed timeout. The default is 1 min.
redeliveryDelay | redelivery delay for failed messages |
timeUnit | unit in which the timeout is provided. |
void pulsar::ConsumerConfiguration::setPatternAutoDiscoveryPeriod | ( | int | periodInSeconds | ) |
Set the time duration in minutes, for which the PatternMultiTopicsConsumer will do a pattern auto discovery. The default value is 60 seconds. less than 0 will disable auto discovery.
periodInSeconds | period in seconds to do an auto discovery |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setPriorityLevel | ( | int | priorityLevel | ) |
Set the Priority Level for consumer (0 is the default value and means the highest priority).
priorityLevel | the priority of this consumer |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setProperties | ( | const std::map< std::string, std::string > & | properties | ) |
Add all the properties in the provided map
ConsumerConfiguration & pulsar::ConsumerConfiguration::setProperty | ( | const std::string & | name, |
const std::string & | value | ||
) |
Sets a new property on a message.
name | the name of the property |
value | the associated value |
void pulsar::ConsumerConfiguration::setReadCompacted | ( | bool | compacted | ) |
If enabled, the consumer reads messages from the compacted topics rather than reading the full message backlog of the topic. This means that if the topic has been compacted, the consumer only sees the latest value for each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that point, message is sent as normal.
readCompacted
can only be enabled subscriptions to persistent topics, which have a single active consumer (for example, failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a shared subscription leads to the subscription call failure.
readCompacted | whether to read from the compacted topic |
void pulsar::ConsumerConfiguration::setReceiverQueueSize | ( | int | size | ) |
Sets the size of the consumer receive queue.
The consumer receive queue controls how many messages can be accumulated by the consumer before the application calls receive(). Using a higher value may potentially increase the consumer throughput at the expense of bigger memory utilization.
Setting the consumer queue size to 0 decreases the throughput of the consumer by disabling pre-fetching of messages. This approach improves the message distribution on shared subscription by pushing messages only to the consumers that are ready to process them. Neither receive with timeout nor partitioned topics can be used if the consumer queue size is 0. The receive() function call should not be interrupted when the consumer queue size is 0.
The default value is 1000 messages and it is appropriate for the most use cases.
size | the new receiver queue size value |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setRegexSubscriptionMode | ( | RegexSubscriptionMode | regexSubscriptionMode | ) |
Determines which topics this consumer should be subscribed to - Persistent, Non-Persistent, or AllTopics. Only used with pattern subscriptions.
regexSubscriptionMode | The default value is PersistentOnly . |
void pulsar::ConsumerConfiguration::setReplicateSubscriptionStateEnabled | ( | bool | enabled | ) |
Set whether the subscription status should be replicated. The default value is false
.
replicateSubscriptionState | whether the subscription status should be replicated |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setSchema | ( | const SchemaInfo & | schemaInfo | ) |
Declare the schema of the data that this consumer will be accepting.
The schema will be checked against the schema of the topic, and the consumer creation will fail if it's not compatible.
schemaInfo | the schema definition object |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setStartMessageIdInclusive | ( | bool | startMessageIdInclusive | ) |
Set the consumer to include the given position of any reset operation like Consumer::seek.
Default: false
startMessageIdInclusive | whether to include the reset position |
void pulsar::ConsumerConfiguration::setSubscriptionInitialPosition | ( | InitialPosition | subscriptionInitialPosition | ) |
The default value is InitialPositionLatest
.
subscriptionInitialPosition | the initial position at which to set the cursor when subscribing to the topic for the first time |
ConsumerConfiguration & pulsar::ConsumerConfiguration::setSubscriptionProperties | ( | const std::map< std::string, std::string > & | subscriptionProperties | ) |
Sets a new subscription properties for this subscription. Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to create a subscription if they use different properties.
subscriptionProperties | all the subscription properties in the provided map |
void pulsar::ConsumerConfiguration::setTickDurationInMs | ( | const uint64_t | milliSeconds | ) |
Set the tick duration time that defines the granularity of the ack-timeout redelivery (in milliseconds).
The default value is 1000, which means 1 second.
Using a higher tick time reduces the memory overhead to track messages when the ack-timeout is set to a bigger value.
milliSeconds | the tick duration time (in milliseconds) |
void pulsar::ConsumerConfiguration::setUnAckedMessagesTimeoutMs | ( | const uint64_t | milliSeconds | ) |
Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds). If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are redelivered.
Default: 0, which means the the tracker for unacknowledged messages is disabled.
timeout | in milliseconds |