pulsar-client-cpp
|
Public Member Functions | |
Consumer () | |
const std::string & | getTopic () const |
const std::string & | getSubscriptionName () const |
Result | unsubscribe () |
void | unsubscribeAsync (ResultCallback callback) |
Result | receive (Message &msg) |
template<typename T > | |
Result | receive (TypedMessage< T > &msg, typename TypedMessage< T >::Decoder decoder) |
Result | receive (Message &msg, int timeoutMs) |
template<typename T > | |
Result | receive (TypedMessage< T > &msg, int timeoutMs, typename TypedMessage< T >::Decoder decoder) |
void | receiveAsync (ReceiveCallback callback) |
template<typename T > | |
void | receiveAsync (std::function< void(Result result, const TypedMessage< T > &)> callback, typename TypedMessage< T >::Decoder decoder) |
Result | batchReceive (Messages &msgs) |
void | batchReceiveAsync (BatchReceiveCallback callback) |
Result | acknowledge (const Message &message) |
Result | acknowledge (const MessageId &messageId) |
Result | acknowledge (const MessageIdList &messageIdList) |
void | acknowledgeAsync (const Message &message, ResultCallback callback) |
void | acknowledgeAsync (const MessageId &messageId, ResultCallback callback) |
void | acknowledgeAsync (const MessageIdList &messageIdList, ResultCallback callback) |
Result | acknowledgeCumulative (const Message &message) |
Result | acknowledgeCumulative (const MessageId &messageId) |
void | acknowledgeCumulativeAsync (const Message &message, ResultCallback callback) |
void | acknowledgeCumulativeAsync (const MessageId &messageId, ResultCallback callback) |
void | negativeAcknowledge (const Message &message) |
void | negativeAcknowledge (const MessageId &messageId) |
Result | close () |
void | closeAsync (ResultCallback callback) |
Result | pauseMessageListener () |
Result | resumeMessageListener () |
void | redeliverUnacknowledgedMessages () |
Result | getBrokerConsumerStats (BrokerConsumerStats &brokerConsumerStats) |
void | getBrokerConsumerStatsAsync (BrokerConsumerStatsCallback callback) |
Result | seek (const MessageId &messageId) |
Result | seek (uint64_t timestamp) |
virtual void | seekAsync (const MessageId &messageId, ResultCallback callback) |
virtual void | seekAsync (uint64_t timestamp, ResultCallback callback) |
bool | isConnected () const |
void | getLastMessageIdAsync (GetLastMessageIdCallback callback) |
Result | getLastMessageId (MessageId &messageId) |
Friends | |
class | PulsarFriend |
class | PulsarWrapper |
class | MultiTopicsConsumerImpl |
class | ConsumerImpl |
class | ClientImpl |
class | ConsumerTest |
pulsar::Consumer::Consumer | ( | ) |
Construct an uninitialized consumer object
Acknowledge the reception of a single message.
This method will block until an acknowledgement is sent to the broker. After that, the message will not be re-delivered to this consumer.
message | the message to acknowledge |
Acknowledge the reception of a single message.
This method is blocked until an acknowledgement is sent to the broker. After that, the message is not re-delivered to the consumer.
messageId | the MessageId to acknowledge |
Result pulsar::Consumer::acknowledge | ( | const MessageIdList & | messageIdList | ) |
Acknowledge the consumption of a list of message.
messageIdList |
void pulsar::Consumer::acknowledgeAsync | ( | const Message & | message, |
ResultCallback | callback | ||
) |
Asynchronously acknowledge the reception of a single message.
This method will initiate the operation and return immediately. The provided callback will be triggered when the operation is complete.
message | the message to acknowledge |
callback | callback that will be triggered when the message has been acknowledged |
void pulsar::Consumer::acknowledgeAsync | ( | const MessageId & | messageId, |
ResultCallback | callback | ||
) |
Asynchronously acknowledge the reception of a single message.
This method initiates the operation and returns the result immediately. The provided callback is triggered when the operation is completed.
messageId | the messageId to acknowledge |
callback | the callback that is triggered when the message has been acknowledged or not |
void pulsar::Consumer::acknowledgeAsync | ( | const MessageIdList & | messageIdList, |
ResultCallback | callback | ||
) |
Asynchronously acknowledge the consumption of a list of message.
messageIdList | |
callback | the callback that is triggered when the message has been acknowledged or not |
Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
This method will block until an acknowledgement is sent to the broker. After that, the messages will not be re-delivered to this consumer.
Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
It's equivalent to calling asyncAcknowledgeCumulative(const Message&, ResultCallback) and waiting for the callback to be triggered.
message | the last message in the stream to acknowledge |
Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
This method is blocked until an acknowledgement is sent to the broker. After that, the message is not re-delivered to this consumer.
Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
It is equivalent to calling the asyncAcknowledgeCumulative(const Message&, ResultCallback) method and waiting for the callback to be triggered.
messageId | the last messageId in the stream to acknowledge |
void pulsar::Consumer::acknowledgeCumulativeAsync | ( | const Message & | message, |
ResultCallback | callback | ||
) |
Asynchronously acknowledge the reception of all the messages in the stream up to (and including) the provided message.
This method will initiate the operation and return immediately. The provided callback will be triggered when the operation is complete.
message | the message to acknowledge |
callback | callback that will be triggered when the message has been acknowledged |
void pulsar::Consumer::acknowledgeCumulativeAsync | ( | const MessageId & | messageId, |
ResultCallback | callback | ||
) |
Asynchronously acknowledge the reception of all the messages in the stream up to (and including) the provided message.
This method initiates the operation and returns the result immediately. The provided callback is triggered when the operation is completed.
messageId | the messageId to acknowledge |
callback | the callback that is triggered when the message has been acknowledged or not |
Batch receiving messages.
This calls blocks until has enough messages or wait timeout, more details to see BatchReceivePolicy
.
msgs | a non-const reference where the received messages will be copied |
void pulsar::Consumer::batchReceiveAsync | ( | BatchReceiveCallback | callback | ) |
Async Batch receiving messages.
Retrieves a message when it will be available and completes callback with received message.
batchReceiveAsync() should be called subsequently once callback gets completed with received message. Else it creates backlog of receive requests in the application.
BatchReceiveCallback | will be completed when messages are available. |
Result pulsar::Consumer::close | ( | ) |
Close the consumer and stop the broker to push more messages
void pulsar::Consumer::closeAsync | ( | ResultCallback | callback | ) |
Asynchronously close the consumer and stop the broker to push more messages
Result pulsar::Consumer::getBrokerConsumerStats | ( | BrokerConsumerStats & | brokerConsumerStats | ) |
Gets Consumer Stats from broker. The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are still valid.
brokerConsumerStats | - if the function returns ResultOk, this object will contain consumer stats |
void pulsar::Consumer::getBrokerConsumerStatsAsync | ( | BrokerConsumerStatsCallback | callback | ) |
Asynchronous call to gets Consumer Stats from broker. The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are still valid.
callback | - callback function to get the brokerConsumerStats, if result is ResultOk then the brokerConsumerStats will be populated |
Get an ID of the last available message or a message ID with -1 as an entryId if the topic is empty.
void pulsar::Consumer::getLastMessageIdAsync | ( | GetLastMessageIdCallback | callback | ) |
Asynchronously get an ID of the last available message or a message ID with -1 as an entryId if the topic is empty.
const std::string & pulsar::Consumer::getSubscriptionName | ( | ) | const |
const std::string & pulsar::Consumer::getTopic | ( | ) | const |
bool pulsar::Consumer::isConnected | ( | ) | const |
void pulsar::Consumer::negativeAcknowledge | ( | const Message & | message | ) |
Acknowledge the failure to process a single message.
When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerConfiguration#setNegativeAckRedeliveryDelayMs
.
This call is not blocking.
Example of usage:
while (true) {
Message msg;
consumer.receive(msg);
try {
// Process message...
consumer.acknowledge(msg);
} catch (Throwable t) {
log.warn("Failed to process message");
consumer.negativeAcknowledge(msg);
}
}
message | The Message to be acknowledged |
void pulsar::Consumer::negativeAcknowledge | ( | const MessageId & | messageId | ) |
Acknowledge the failure to process a single message.
When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerConfiguration#setNegativeAckRedeliveryDelayMs
.
This call is not blocking.
Example of usage:
while (true) {
Message msg;
consumer.receive(msg);
try {
// Process message...
consumer.acknowledge(msg);
} catch (Throwable t) {
log.warn("Failed to process message");
consumer.negativeAcknowledge(msg);
}
}
messageId | The MessageId to be acknowledged |
Result pulsar::Consumer::pauseMessageListener | ( | ) |
Pause receiving messages via the messageListener, till resumeMessageListener() is called.
Receive a single message.
If a message is not immediately available, this method will block until a new message is available.
msg | a non-const reference where the received message will be copied |
msg | a non-const reference where the received message will be copied |
timeoutMs | the receive timeout in milliseconds |
void pulsar::Consumer::receiveAsync | ( | ReceiveCallback | callback | ) |
Receive a single message
Retrieves a message when it will be available and completes callback with received message.
receiveAsync() should be called subsequently once callback gets completed with received message. Else it creates backlog of receive requests in the application.
ReceiveCallback | will be completed when message is available |
void pulsar::Consumer::redeliverUnacknowledgedMessages | ( | ) |
Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection breaks, the messages are redelivered after reconnect.
Result pulsar::Consumer::resumeMessageListener | ( | ) |
Resume receiving the messages via the messageListener. Asynchronously receive all the messages enqueued from time pauseMessageListener() was called.
Reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions.
messageId | the message id where to reposition the subscription |
Result pulsar::Consumer::seek | ( | uint64_t | timestamp | ) |
Reset the subscription associated with this consumer to a specific message publish time.
timestamp | the message publish time where to reposition the subscription |
|
virtual |
Asynchronously reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions.
messageId | the message id where to reposition the subscription |
|
virtual |
Asynchronously reset the subscription associated with this consumer to a specific message publish time.
timestamp | the message publish time where to reposition the subscription |
Result pulsar::Consumer::unsubscribe | ( | ) |
Unsubscribe the current consumer from the topic.
This method will block until the operation is completed. Once the consumer is unsubscribed, no more messages will be received and subsequent new messages will not be retained for this consumer.
This consumer object cannot be reused.
void pulsar::Consumer::unsubscribeAsync | ( | ResultCallback | callback | ) |
Asynchronously unsubscribe the current consumer from the topic.
This method will block until the operation is completed. Once the consumer is unsubscribed, no more messages will be received and subsequent new messages will not be retained for this consumer.
This consumer object cannot be reused.
callback | the callback to get notified when the operation is complete |