pulsar-client-cpp
ConsumerConfiguration.h
1 
19 #ifndef PULSAR_CONSUMERCONFIGURATION_H_
20 #define PULSAR_CONSUMERCONFIGURATION_H_
21 
22 #include <pulsar/ConsumerCryptoFailureAction.h>
23 #include <pulsar/ConsumerEventListener.h>
24 #include <pulsar/ConsumerInterceptor.h>
25 #include <pulsar/ConsumerType.h>
26 #include <pulsar/CryptoKeyReader.h>
27 #include <pulsar/InitialPosition.h>
28 #include <pulsar/KeySharedPolicy.h>
29 #include <pulsar/Message.h>
30 #include <pulsar/RegexSubscriptionMode.h>
31 #include <pulsar/Result.h>
32 #include <pulsar/Schema.h>
33 #include <pulsar/TypedMessage.h>
34 #include <pulsar/defines.h>
35 
36 #include <functional>
37 #include <memory>
38 
39 #include "BatchReceivePolicy.h"
40 #include "DeadLetterPolicy.h"
41 
42 namespace pulsar {
43 
44 class Consumer;
45 class PulsarWrapper;
46 class PulsarFriend;
47 
49 typedef std::vector<Message> Messages;
50 typedef std::function<void(Result result)> ResultCallback;
51 typedef std::function<void(Result, const Message& msg)> ReceiveCallback;
52 typedef std::function<void(Result, const Messages& msgs)> BatchReceiveCallback;
53 typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
54 
56 typedef std::function<void(Consumer& consumer, const Message& msg)> MessageListener;
57 
58 typedef std::shared_ptr<ConsumerEventListener> ConsumerEventListenerPtr;
59 
60 struct ConsumerConfigurationImpl;
61 
65 class PULSAR_PUBLIC ConsumerConfiguration {
66  public:
71 
76  ConsumerConfiguration clone() const;
77 
86  ConsumerConfiguration& setSchema(const SchemaInfo& schemaInfo);
87 
91  const SchemaInfo& getSchema() const;
92 
105  ConsumerConfiguration& setConsumerType(ConsumerType consumerType);
106 
110  ConsumerType getConsumerType() const;
111 
120  ConsumerConfiguration& setKeySharedPolicy(KeySharedPolicy keySharedPolicy);
121 
125  KeySharedPolicy getKeySharedPolicy() const;
126 
132  ConsumerConfiguration& setMessageListener(MessageListener messageListener);
133 
134  template <typename T>
135  ConsumerConfiguration& setTypedMessageListener(
136  std::function<void(Consumer&, const TypedMessage<T>&)> listener,
137  typename TypedMessage<T>::Decoder decoder) {
138  return setMessageListener([listener, decoder](Consumer& consumer, const Message& msg) {
139  listener(consumer, TypedMessage<T>{msg, decoder});
140  });
141  }
142 
146  MessageListener getMessageListener() const;
147 
151  bool hasMessageListener() const;
152 
157  ConsumerConfiguration& setConsumerEventListener(ConsumerEventListenerPtr eventListener);
158 
162  ConsumerEventListenerPtr getConsumerEventListener() const;
163 
167  bool hasConsumerEventListener() const;
168 
190  void setReceiverQueueSize(int size);
191 
195  int getReceiverQueueSize() const;
196 
205  void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
206 
210  int getMaxTotalReceiverQueueSizeAcrossPartitions() const;
211 
217  void setConsumerName(const std::string& consumerName);
218 
222  const std::string& getConsumerName() const;
223 
234  void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds);
235 
239  long getUnAckedMessagesTimeoutMs() const;
240 
252  void setTickDurationInMs(const uint64_t milliSeconds);
253 
257  long getTickDurationInMs() const;
258 
271  void setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis);
272 
278  long getNegativeAckRedeliveryDelayMs() const;
279 
288  void setAckGroupingTimeMs(long ackGroupingMillis);
289 
295  long getAckGroupingTimeMs() const;
296 
303  void setAckGroupingMaxSize(long maxGroupingSize);
304 
310  long getAckGroupingMaxSize() const;
311 
319  void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs);
320 
324  long getBrokerConsumerStatsCacheTimeInMs() const;
325 
329  bool isEncryptionEnabled() const;
330 
334  const CryptoKeyReaderPtr getCryptoKeyReader() const;
335 
341  ConsumerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
342 
346  ConsumerCryptoFailureAction getCryptoFailureAction() const;
347 
351  ConsumerConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction action);
352 
356  bool isReadCompacted() const;
357 
371  void setReadCompacted(bool compacted);
372 
380  void setPatternAutoDiscoveryPeriod(int periodInSeconds);
381 
385  int getPatternAutoDiscoveryPeriod() const;
386 
393  ConsumerConfiguration& setRegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode);
394 
398  RegexSubscriptionMode getRegexSubscriptionMode() const;
399 
406  void setSubscriptionInitialPosition(InitialPosition subscriptionInitialPosition);
407 
411  InitialPosition getSubscriptionInitialPosition() const;
412 
419  void setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy);
420 
426  const BatchReceivePolicy& getBatchReceivePolicy() const;
427 
455  void setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy);
456 
462  const DeadLetterPolicy& getDeadLetterPolicy() const;
463 
470  void setReplicateSubscriptionStateEnabled(bool enabled);
471 
475  bool isReplicateSubscriptionStateEnabled() const;
476 
484  bool hasProperty(const std::string& name) const;
485 
492  const std::string& getProperty(const std::string& name) const;
493 
497  std::map<std::string, std::string>& getProperties() const;
498 
504  ConsumerConfiguration& setProperty(const std::string& name, const std::string& value);
505 
509  ConsumerConfiguration& setProperties(const std::map<std::string, std::string>& properties);
510 
514  std::map<std::string, std::string>& getSubscriptionProperties() const;
515 
523  ConsumerConfiguration& setSubscriptionProperties(
524  const std::map<std::string, std::string>& subscriptionProperties);
525 
532  ConsumerConfiguration& setPriorityLevel(int priorityLevel);
533 
537  int getPriorityLevel() const;
538 
560  ConsumerConfiguration& setMaxPendingChunkedMessage(size_t maxPendingChunkedMessage);
561 
565  size_t getMaxPendingChunkedMessage() const;
566 
577  ConsumerConfiguration& setAutoAckOldestChunkedMessageOnQueueFull(
578  bool autoAckOldestChunkedMessageOnQueueFull);
579 
583  bool isAutoAckOldestChunkedMessageOnQueueFull() const;
584 
594  ConsumerConfiguration& setExpireTimeOfIncompleteChunkedMessageMs(
595  long expireTimeOfIncompleteChunkedMessageMs);
596 
603  long getExpireTimeOfIncompleteChunkedMessageMs() const;
604 
612  ConsumerConfiguration& setStartMessageIdInclusive(bool startMessageIdInclusive);
613 
617  bool isStartMessageIdInclusive() const;
618 
629  ConsumerConfiguration& setBatchIndexAckEnabled(bool enabled);
630 
634  bool isBatchIndexAckEnabled() const;
635 
642  ConsumerConfiguration& intercept(const std::vector<ConsumerInterceptorPtr>& interceptors);
643 
644  const std::vector<ConsumerInterceptorPtr>& getInterceptors() const;
645 
655  ConsumerConfiguration& setAckReceiptEnabled(bool ackReceiptEnabled);
656 
660  bool isAckReceiptEnabled() const;
661 
662  friend class PulsarWrapper;
663  friend class PulsarFriend;
664 
665  private:
666  std::shared_ptr<ConsumerConfigurationImpl> impl_;
667 };
668 } // namespace pulsar
669 #endif /* PULSAR_CONSUMERCONFIGURATION_H_ */
pulsar::KeySharedPolicy
Definition: KeySharedPolicy.h:52
pulsar::MessageId
Definition: MessageId.h:34
pulsar::Result
Result
Definition: Result.h:31
pulsar::Messages
std::vector< Message > Messages
Callback definition for non-data operation.
Definition: ConsumerConfiguration.h:46
pulsar::DeadLetterPolicy
Definition: DeadLetterPolicy.h:36
pulsar::ConsumerConfiguration
Definition: ConsumerConfiguration.h:65
pulsar::BatchReceivePolicy
Definition: BatchReceivePolicy.h:52
pulsar::ConsumerType
ConsumerType
Definition: ConsumerType.h:23
pulsar::Message
Definition: Message.h:43
pulsar::TypedMessage
Definition: TypedMessage.h:28
pulsar::Consumer
Definition: Consumer.h:37
pulsar::SchemaInfo
Definition: Schema.h:146
pulsar
Definition: Authentication.h:31
pulsar::MessageListener
std::function< void(Consumer &consumer, const Message &msg)> MessageListener
Callback definition for MessageListener.
Definition: ConsumerConfiguration.h:56
pulsar::RegexSubscriptionMode
RegexSubscriptionMode
Definition: RegexSubscriptionMode.h:23
pulsar::ResultCallback
std::function< void(Result result)> ResultCallback
Callback definition for non-data operation.
Definition: ConsumerConfiguration.h:50