pulsar-client-cpp
Loading...
Searching...
No Matches
Public Member Functions | Friends | List of all members
pulsar::Consumer Class Reference

Public Member Functions

 Consumer ()
 
const std::string & getTopic () const
 
const std::string & getSubscriptionName () const
 
const std::string & getConsumerName () const
 
Result unsubscribe ()
 
void unsubscribeAsync (ResultCallback callback)
 
Result receive (Message &msg)
 
template<typename T >
Result receive (TypedMessage< T > &msg, typename TypedMessage< T >::Decoder decoder)
 
Result receive (Message &msg, int timeoutMs)
 
template<typename T >
Result receive (TypedMessage< T > &msg, int timeoutMs, typename TypedMessage< T >::Decoder decoder)
 
void receiveAsync (ReceiveCallback callback)
 
template<typename T >
void receiveAsync (std::function< void(Result result, const TypedMessage< T > &)> callback, typename TypedMessage< T >::Decoder decoder)
 
Result batchReceive (Messages &msgs)
 
void batchReceiveAsync (BatchReceiveCallback callback)
 
Result acknowledge (const Message &message)
 
Result acknowledge (const MessageId &messageId)
 
Result acknowledge (const MessageIdList &messageIdList)
 
void acknowledgeAsync (const Message &message, ResultCallback callback)
 
void acknowledgeAsync (const MessageId &messageId, ResultCallback callback)
 
void acknowledgeAsync (const MessageIdList &messageIdList, ResultCallback callback)
 
Result acknowledgeCumulative (const Message &message)
 
Result acknowledgeCumulative (const MessageId &messageId)
 
void acknowledgeCumulativeAsync (const Message &message, ResultCallback callback)
 
void acknowledgeCumulativeAsync (const MessageId &messageId, ResultCallback callback)
 
void negativeAcknowledge (const Message &message)
 
void negativeAcknowledge (const MessageId &messageId)
 
Result close ()
 
void closeAsync (ResultCallback callback)
 
Result pauseMessageListener ()
 
Result resumeMessageListener ()
 
void redeliverUnacknowledgedMessages ()
 
Result getBrokerConsumerStats (BrokerConsumerStats &brokerConsumerStats)
 
void getBrokerConsumerStatsAsync (BrokerConsumerStatsCallback callback)
 
Result seek (const MessageId &messageId)
 
Result seek (uint64_t timestamp)
 
virtual void seekAsync (const MessageId &messageId, ResultCallback callback)
 
virtual void seekAsync (uint64_t timestamp, ResultCallback callback)
 
bool isConnected () const
 
void getLastMessageIdAsync (GetLastMessageIdCallback callback)
 
Result getLastMessageId (MessageId &messageId)
 

Friends

class PulsarFriend
 
class PulsarWrapper
 
class MultiTopicsConsumerImpl
 
class ConsumerImpl
 
class ClientImpl
 
class ConsumerTest
 

Constructor & Destructor Documentation

◆ Consumer()

pulsar::Consumer::Consumer ( )

Construct an uninitialized consumer object

Member Function Documentation

◆ acknowledge() [1/3]

Result pulsar::Consumer::acknowledge ( const Message message)

Acknowledge the reception of a single message.

This method will block until an acknowledgement is sent to the broker. After that, the message will not be re-delivered to this consumer.

See also
asyncAcknowledge
Parameters
messagethe message to acknowledge
Returns
ResultOk if the message was successfully acknowledged
ResultError if there was a failure

◆ acknowledge() [2/3]

Result pulsar::Consumer::acknowledge ( const MessageId messageId)

Acknowledge the reception of a single message.

This method is blocked until an acknowledgement is sent to the broker. After that, the message is not re-delivered to the consumer.

See also
asyncAcknowledge
Parameters
messageIdthe MessageId to acknowledge
Returns
ResultOk if the messageId is successfully acknowledged

◆ acknowledge() [3/3]

Result pulsar::Consumer::acknowledge ( const MessageIdList &  messageIdList)

Acknowledge the consumption of a list of message.

Parameters
messageIdList

◆ acknowledgeAsync() [1/3]

void pulsar::Consumer::acknowledgeAsync ( const Message message,
ResultCallback  callback 
)

Asynchronously acknowledge the reception of a single message.

This method will initiate the operation and return immediately. The provided callback will be triggered when the operation is complete.

Parameters
messagethe message to acknowledge
callbackcallback that will be triggered when the message has been acknowledged

◆ acknowledgeAsync() [2/3]

void pulsar::Consumer::acknowledgeAsync ( const MessageId messageId,
ResultCallback  callback 
)

Asynchronously acknowledge the reception of a single message.

This method initiates the operation and returns the result immediately. The provided callback is triggered when the operation is completed.

Parameters
messageIdthe messageId to acknowledge
callbackthe callback that is triggered when the message has been acknowledged or not

◆ acknowledgeAsync() [3/3]

void pulsar::Consumer::acknowledgeAsync ( const MessageIdList &  messageIdList,
ResultCallback  callback 
)

Asynchronously acknowledge the consumption of a list of message.

Parameters
messageIdList
callbackthe callback that is triggered when the message has been acknowledged or not
Returns

◆ acknowledgeCumulative() [1/2]

Result pulsar::Consumer::acknowledgeCumulative ( const Message message)

Acknowledge the reception of all the messages in the stream up to (and including) the provided message.

This method will block until an acknowledgement is sent to the broker. After that, the messages will not be re-delivered to this consumer.

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.

It's equivalent to calling asyncAcknowledgeCumulative(const Message&, ResultCallback) and waiting for the callback to be triggered.

Parameters
messagethe last message in the stream to acknowledge
Returns
ResultOk if the message was successfully acknowledged. All previously delivered messages for this topic are also acknowledged.
ResultError if there was a failure

◆ acknowledgeCumulative() [2/2]

Result pulsar::Consumer::acknowledgeCumulative ( const MessageId messageId)

Acknowledge the reception of all the messages in the stream up to (and including) the provided message.

This method is blocked until an acknowledgement is sent to the broker. After that, the message is not re-delivered to this consumer.

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.

It is equivalent to calling the asyncAcknowledgeCumulative(const Message&, ResultCallback) method and waiting for the callback to be triggered.

Parameters
messageIdthe last messageId in the stream to acknowledge
Returns
ResultOk if the message is successfully acknowledged. All previously delivered messages for this topic are also acknowledged.

◆ acknowledgeCumulativeAsync() [1/2]

void pulsar::Consumer::acknowledgeCumulativeAsync ( const Message message,
ResultCallback  callback 
)

Asynchronously acknowledge the reception of all the messages in the stream up to (and including) the provided message.

This method will initiate the operation and return immediately. The provided callback will be triggered when the operation is complete.

Parameters
messagethe message to acknowledge
callbackcallback that will be triggered when the message has been acknowledged

◆ acknowledgeCumulativeAsync() [2/2]

void pulsar::Consumer::acknowledgeCumulativeAsync ( const MessageId messageId,
ResultCallback  callback 
)

Asynchronously acknowledge the reception of all the messages in the stream up to (and including) the provided message.

This method initiates the operation and returns the result immediately. The provided callback is triggered when the operation is completed.

Parameters
messageIdthe messageId to acknowledge
callbackthe callback that is triggered when the message has been acknowledged or not

◆ batchReceive()

Result pulsar::Consumer::batchReceive ( Messages msgs)

Batch receiving messages.

This calls blocks until has enough messages or wait timeout, more details to see BatchReceivePolicy.

Parameters
msgsa non-const reference where the received messages will be copied
Returns
ResultOk when a message is received
ResultInvalidConfiguration if a message listener had been set in the configuration

◆ batchReceiveAsync()

void pulsar::Consumer::batchReceiveAsync ( BatchReceiveCallback  callback)

Async Batch receiving messages.

Retrieves a message when it will be available and completes callback with received message.

batchReceiveAsync() should be called subsequently once callback gets completed with received message. Else it creates backlog of receive requests in the application.

Parameters
BatchReceiveCallbackwill be completed when messages are available.

◆ close()

Result pulsar::Consumer::close ( )

Close the consumer and stop the broker to push more messages

◆ closeAsync()

void pulsar::Consumer::closeAsync ( ResultCallback  callback)

Asynchronously close the consumer and stop the broker to push more messages

◆ getBrokerConsumerStats()

Result pulsar::Consumer::getBrokerConsumerStats ( BrokerConsumerStats brokerConsumerStats)

Gets Consumer Stats from broker. The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are still valid.

Parameters
brokerConsumerStats- if the function returns ResultOk, this object will contain consumer stats
Note
This is a blocking call with timeout of thirty seconds.

◆ getBrokerConsumerStatsAsync()

void pulsar::Consumer::getBrokerConsumerStatsAsync ( BrokerConsumerStatsCallback  callback)

Asynchronous call to gets Consumer Stats from broker. The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are still valid.

Parameters
callback- callback function to get the brokerConsumerStats, if result is ResultOk then the brokerConsumerStats will be populated

◆ getConsumerName()

const std::string & pulsar::Consumer::getConsumerName ( ) const
Returns
the consumer name

◆ getLastMessageId()

Result pulsar::Consumer::getLastMessageId ( MessageId messageId)

Get an ID of the last available message or a message ID with -1 as an entryId if the topic is empty.

◆ getLastMessageIdAsync()

void pulsar::Consumer::getLastMessageIdAsync ( GetLastMessageIdCallback  callback)

Asynchronously get an ID of the last available message or a message ID with -1 as an entryId if the topic is empty.

◆ getSubscriptionName()

const std::string & pulsar::Consumer::getSubscriptionName ( ) const
Returns
the subscription name

◆ getTopic()

const std::string & pulsar::Consumer::getTopic ( ) const
Returns
the topic this consumer is subscribed to

◆ isConnected()

bool pulsar::Consumer::isConnected ( ) const
Returns
Whether the consumer is currently connected to the broker

◆ negativeAcknowledge() [1/2]

void pulsar::Consumer::negativeAcknowledge ( const Message message)

Acknowledge the failure to process a single message.

When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerConfiguration#setNegativeAckRedeliveryDelayMs.

This call is not blocking.

Example of usage:


while (true) {
    Message msg;
    consumer.receive(msg);

    try {
         // Process message...

         consumer.acknowledge(msg);
    } catch (Throwable t) {
         log.warn("Failed to process message");
         consumer.negativeAcknowledge(msg);
    }
}
Parameters
messageThe Message to be acknowledged

◆ negativeAcknowledge() [2/2]

void pulsar::Consumer::negativeAcknowledge ( const MessageId messageId)

Acknowledge the failure to process a single message.

When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerConfiguration#setNegativeAckRedeliveryDelayMs.

This call is not blocking.

Example of usage:


while (true) {
    Message msg;
    consumer.receive(msg);

    try {
         // Process message...

         consumer.acknowledge(msg);
    } catch (Throwable t) {
         log.warn("Failed to process message");
         consumer.negativeAcknowledge(msg);
    }
}
Parameters
messageIdThe MessageId to be acknowledged

◆ pauseMessageListener()

Result pulsar::Consumer::pauseMessageListener ( )

Pause receiving messages via the messageListener, till resumeMessageListener() is called.

◆ receive() [1/2]

Result pulsar::Consumer::receive ( Message msg)

Receive a single message.

If a message is not immediately available, this method will block until a new message is available.

Parameters
msga non-const reference where the received message will be copied
Returns
ResultOk when a message is received
ResultInvalidConfiguration if a message listener had been set in the configuration

◆ receive() [2/2]

Result pulsar::Consumer::receive ( Message msg,
int  timeoutMs 
)
Parameters
msga non-const reference where the received message will be copied
timeoutMsthe receive timeout in milliseconds
Returns
ResultOk if a message was received
ResultTimeout if the receive timeout was triggered
ResultInvalidConfiguration if a message listener had been set in the configuration

◆ receiveAsync()

void pulsar::Consumer::receiveAsync ( ReceiveCallback  callback)

Receive a single message

Retrieves a message when it will be available and completes callback with received message.

receiveAsync() should be called subsequently once callback gets completed with received message. Else it creates backlog of receive requests in the application.

Parameters
ReceiveCallbackwill be completed when message is available

◆ redeliverUnacknowledgedMessages()

void pulsar::Consumer::redeliverUnacknowledgedMessages ( )

Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection breaks, the messages are redelivered after reconnect.

◆ resumeMessageListener()

Result pulsar::Consumer::resumeMessageListener ( )

Resume receiving the messages via the messageListener. Asynchronously receive all the messages enqueued from time pauseMessageListener() was called.

◆ seek() [1/2]

Result pulsar::Consumer::seek ( const MessageId messageId)

Reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.

Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions.

Parameters
messageIdthe message id where to reposition the subscription

◆ seek() [2/2]

Result pulsar::Consumer::seek ( uint64_t  timestamp)

Reset the subscription associated with this consumer to a specific message publish time.

Parameters
timestampthe message publish time where to reposition the subscription

◆ seekAsync() [1/2]

virtual void pulsar::Consumer::seekAsync ( const MessageId messageId,
ResultCallback  callback 
)
virtual

Asynchronously reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.

Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions.

Parameters
messageIdthe message id where to reposition the subscription

◆ seekAsync() [2/2]

virtual void pulsar::Consumer::seekAsync ( uint64_t  timestamp,
ResultCallback  callback 
)
virtual

Asynchronously reset the subscription associated with this consumer to a specific message publish time.

Parameters
timestampthe message publish time where to reposition the subscription

◆ unsubscribe()

Result pulsar::Consumer::unsubscribe ( )

Unsubscribe the current consumer from the topic.

This method will block until the operation is completed. Once the consumer is unsubscribed, no more messages will be received and subsequent new messages will not be retained for this consumer.

This consumer object cannot be reused.

See also
asyncUnsubscribe
Returns
Result::ResultOk if the unsubscribe operation completed successfully
Result::ResultError if the unsubscribe operation failed

◆ unsubscribeAsync()

void pulsar::Consumer::unsubscribeAsync ( ResultCallback  callback)

Asynchronously unsubscribe the current consumer from the topic.

This method will block until the operation is completed. Once the consumer is unsubscribed, no more messages will be received and subsequent new messages will not be retained for this consumer.

This consumer object cannot be reused.

Parameters
callbackthe callback to get notified when the operation is complete

The documentation for this class was generated from the following file: