pulsar-client-cpp
Loading...
Searching...
No Matches
ConsumerConfiguration.h
1
19#ifndef PULSAR_CONSUMERCONFIGURATION_H_
20#define PULSAR_CONSUMERCONFIGURATION_H_
21
22#include <pulsar/ConsumerCryptoFailureAction.h>
23#include <pulsar/ConsumerEventListener.h>
24#include <pulsar/ConsumerInterceptor.h>
25#include <pulsar/ConsumerType.h>
26#include <pulsar/CryptoKeyReader.h>
27#include <pulsar/InitialPosition.h>
28#include <pulsar/KeySharedPolicy.h>
29#include <pulsar/Message.h>
30#include <pulsar/RegexSubscriptionMode.h>
31#include <pulsar/Result.h>
32#include <pulsar/Schema.h>
33#include <pulsar/TypedMessage.h>
34#include <pulsar/defines.h>
35
36#include <functional>
37#include <memory>
38
39#include "BatchReceivePolicy.h"
40#include "DeadLetterPolicy.h"
41
42namespace pulsar {
43
44class Consumer;
45class PulsarWrapper;
46class PulsarFriend;
47
49typedef std::vector<Message> Messages;
50typedef std::function<void(Result result)> ResultCallback;
51typedef std::function<void(Result, const Message& msg)> ReceiveCallback;
52typedef std::function<void(Result, const Messages& msgs)> BatchReceiveCallback;
53typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
54
56typedef std::function<void(Consumer& consumer, const Message& msg)> MessageListener;
57
58typedef std::shared_ptr<ConsumerEventListener> ConsumerEventListenerPtr;
59
60struct ConsumerConfigurationImpl;
61
65class PULSAR_PUBLIC ConsumerConfiguration {
66 public:
71
77
87
91 const SchemaInfo& getSchema() const;
92
106
111
121
126
133
134 template <typename T>
135 ConsumerConfiguration& setTypedMessageListener(
136 std::function<void(Consumer&, const TypedMessage<T>&)> listener,
137 typename TypedMessage<T>::Decoder decoder) {
138 return setMessageListener([listener, decoder](Consumer& consumer, const Message& msg) {
139 listener(consumer, TypedMessage<T>{msg, decoder});
140 });
141 }
142
147
151 bool hasMessageListener() const;
152
157 ConsumerConfiguration& setConsumerEventListener(ConsumerEventListenerPtr eventListener);
158
162 ConsumerEventListenerPtr getConsumerEventListener() const;
163
168
190 void setReceiverQueueSize(int size);
191
196
205 void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
206
211
217 void setConsumerName(const std::string& consumerName);
218
222 const std::string& getConsumerName() const;
223
234 void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds);
235
240
252 void setTickDurationInMs(const uint64_t milliSeconds);
253
258
271 void setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis);
272
279
286 void setNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt);
287
294
303 void setAckGroupingTimeMs(long ackGroupingMillis);
304
311
318 void setAckGroupingMaxSize(long maxGroupingSize);
319
326
334 void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs);
335
340
345
349 const CryptoKeyReaderPtr getCryptoKeyReader() const;
350
356 ConsumerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
357
361 ConsumerCryptoFailureAction getCryptoFailureAction() const;
362
366 ConsumerConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction action);
367
371 bool isReadCompacted() const;
372
386 void setReadCompacted(bool compacted);
387
395 void setPatternAutoDiscoveryPeriod(int periodInSeconds);
396
401
409
414
421 void setSubscriptionInitialPosition(InitialPosition subscriptionInitialPosition);
422
426 InitialPosition getSubscriptionInitialPosition() const;
427
434 void setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy);
435
442
470 void setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy);
471
478
486
491
499 bool hasProperty(const std::string& name) const;
500
507 const std::string& getProperty(const std::string& name) const;
508
512 std::map<std::string, std::string>& getProperties() const;
513
519 ConsumerConfiguration& setProperty(const std::string& name, const std::string& value);
520
524 ConsumerConfiguration& setProperties(const std::map<std::string, std::string>& properties);
525
529 std::map<std::string, std::string>& getSubscriptionProperties() const;
530
539 const std::map<std::string, std::string>& subscriptionProperties);
540
548
552 int getPriorityLevel() const;
553
575 ConsumerConfiguration& setMaxPendingChunkedMessage(size_t maxPendingChunkedMessage);
576
581
593 bool autoAckOldestChunkedMessageOnQueueFull);
594
599
610 long expireTimeOfIncompleteChunkedMessageMs);
611
619
627 ConsumerConfiguration& setStartMessageIdInclusive(bool startMessageIdInclusive);
628
633
645
650
657 ConsumerConfiguration& intercept(const std::vector<ConsumerInterceptorPtr>& interceptors);
658
659 const std::vector<ConsumerInterceptorPtr>& getInterceptors() const;
660
671
676
686
690 bool isStartPaused() const;
691
692 friend class PulsarWrapper;
693 friend class PulsarFriend;
694
695 private:
696 std::shared_ptr<ConsumerConfigurationImpl> impl_;
697};
698} // namespace pulsar
699#endif /* PULSAR_CONSUMERCONFIGURATION_H_ */
Definition BatchReceivePolicy.h:52
Definition ConsumerConfiguration.h:65
ConsumerConfiguration & setProperties(const std::map< std::string, std::string > &properties)
ConsumerConfiguration & setPriorityLevel(int priorityLevel)
const DeadLetterPolicy & getDeadLetterPolicy() const
ConsumerConfiguration & setBatchIndexAckEnabled(bool enabled)
ConsumerType getConsumerType() const
long getBrokerConsumerStatsCacheTimeInMs() const
void setReceiverQueueSize(int size)
MessageListener getMessageListener() const
ConsumerCryptoFailureAction getCryptoFailureAction() const
long getUnAckedMessagesTimeoutMs() const
ConsumerConfiguration & intercept(const std::vector< ConsumerInterceptorPtr > &interceptors)
ConsumerConfiguration & setAutoAckOldestChunkedMessageOnQueueFull(bool autoAckOldestChunkedMessageOnQueueFull)
ConsumerConfiguration & setAckReceiptEnabled(bool ackReceiptEnabled)
void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs)
size_t getMaxPendingChunkedMessage() const
bool isReplicateSubscriptionStateEnabled() const
bool isAutoAckOldestChunkedMessageOnQueueFull() const
ConsumerConfiguration & setConsumerType(ConsumerType consumerType)
int getPatternAutoDiscoveryPeriod() const
ConsumerConfiguration & setRegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode)
void setBatchReceivePolicy(const BatchReceivePolicy &batchReceivePolicy)
void setPatternAutoDiscoveryPeriod(int periodInSeconds)
void setAckGroupingMaxSize(long maxGroupingSize)
std::map< std::string, std::string > & getProperties() const
long getExpireTimeOfIncompleteChunkedMessageMs() const
InitialPosition getSubscriptionInitialPosition() const
ConsumerConfiguration & setConsumerEventListener(ConsumerEventListenerPtr eventListener)
const CryptoKeyReaderPtr getCryptoKeyReader() const
void setReplicateSubscriptionStateEnabled(bool enabled)
RegexSubscriptionMode getRegexSubscriptionMode() const
ConsumerConfiguration & setSchema(const SchemaInfo &schemaInfo)
long getNegativeAckRedeliveryDelayMs() const
void setDeadLetterPolicy(const DeadLetterPolicy &deadLetterPolicy)
bool hasProperty(const std::string &name) const
const std::string & getProperty(const std::string &name) const
void setTickDurationInMs(const uint64_t milliSeconds)
const BatchReceivePolicy & getBatchReceivePolicy() const
bool isStartMessageIdInclusive() const
ConsumerConfiguration & setMessageListener(MessageListener messageListener)
void setConsumerName(const std::string &consumerName)
void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions)
ConsumerConfiguration & setMaxPendingChunkedMessage(size_t maxPendingChunkedMessage)
ConsumerConfiguration & setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader)
std::map< std::string, std::string > & getSubscriptionProperties() const
ConsumerConfiguration & setCryptoFailureAction(ConsumerCryptoFailureAction action)
void setSubscriptionInitialPosition(InitialPosition subscriptionInitialPosition)
int getNegativeAckPrecisionBitCnt() const
bool hasConsumerEventListener() const
int getMaxTotalReceiverQueueSizeAcrossPartitions() const
ConsumerConfiguration & setExpireTimeOfIncompleteChunkedMessageMs(long expireTimeOfIncompleteChunkedMessageMs)
ConsumerConfiguration & setKeySharedPolicy(const KeySharedPolicy &keySharedPolicy)
ConsumerConfiguration & setProperty(const std::string &name, const std::string &value)
const SchemaInfo & getSchema() const
void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds)
ConsumerEventListenerPtr getConsumerEventListener() const
ConsumerConfiguration & setStartPaused(bool startPaused)
void setNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt)
KeySharedPolicy getKeySharedPolicy() const
void setReadCompacted(bool compacted)
ConsumerConfiguration clone() const
ConsumerConfiguration & setSubscriptionProperties(const std::map< std::string, std::string > &subscriptionProperties)
void setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis)
const std::string & getConsumerName() const
void setAckGroupingTimeMs(long ackGroupingMillis)
ConsumerConfiguration & setStartMessageIdInclusive(bool startMessageIdInclusive)
Definition Consumer.h:37
Definition DeadLetterPolicy.h:36
Definition KeySharedPolicy.h:53
Definition Message.h:44
Definition MessageId.h:34
Definition Schema.h:147
Definition TypedMessage.h:28
Definition Authentication.h:31
Result
Definition Result.h:33
ConsumerType
Definition ConsumerType.h:26
std::function< void(Consumer &consumer, const Message &msg)> MessageListener
Callback definition for MessageListener.
Definition ConsumerConfiguration.h:56
RegexSubscriptionMode
Definition RegexSubscriptionMode.h:24
std::vector< Message > Messages
Callback definition for non-data operation.
Definition ConsumerConfiguration.h:49
std::function< void(Result result)> ResultCallback
Callback definition for non-data operation.
Definition ConsumerConfiguration.h:50