Interface Consumer<T>
- All Superinterfaces:
- AutoCloseable,- Closeable
All the operations on the consumer instance are thread safe.
- 
Method SummaryModifier and TypeMethodDescriptionvoidacknowledge(List<MessageId> messageIdList) Acknowledge the consumption of a list of message.voidacknowledge(Message<?> message) Acknowledge the consumption of a single message.voidacknowledge(MessageId messageId) Acknowledge the consumption of a single message, identified by itsMessageId.voidacknowledge(Messages<?> messages) Acknowledge the consumption ofMessages.acknowledgeAsync(List<MessageId> messageIdList) Asynchronously acknowledge the consumption of a list of message.acknowledgeAsync(Message<?> message) Asynchronously acknowledge the consumption of a single message.acknowledgeAsync(MessageId messageId) Asynchronously acknowledge the consumption of a single message.acknowledgeAsync(MessageId messageId, Transaction txn) Asynchronously acknowledge the consumption of a single message, it will store in pending ack.acknowledgeAsync(Messages<?> messages) Asynchronously acknowledge the consumption ofMessages.voidacknowledgeCumulative(Message<?> message) Acknowledge the reception of all the messages in the stream up to (and including) the provided message.voidacknowledgeCumulative(MessageId messageId) Acknowledge the reception of all the messages in the stream up to (and including) the provided message.acknowledgeCumulativeAsync(Message<?> message) Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided message.acknowledgeCumulativeAsync(MessageId messageId) Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided message.acknowledgeCumulativeAsync(MessageId messageId, Transaction txn) Acknowledge the reception of all the messages in the stream up to (and including) the provided message with this transaction, it will store in transaction pending ack.Batch receiving messages.Batch receiving messages.voidclose()Close the consumer and stop the broker to push more messages.Asynchronously close the consumer and stop the broker to push more messages.Get the name of consumer.longGet the last message id available for consume.Get the last message id available for consume.getStats()Get statistics for the consumer.Get a subscription for the consumer.getTopic()Get a topic for the consumer.booleanReturn true if the topic was terminated and this consumer has already consumed all the messages in the topic.booleanvoidnegativeAcknowledge(Message<?> message) Acknowledge the failure to process a single message.voidnegativeAcknowledge(MessageId messageId) Acknowledge the failure to process a single message.voidnegativeAcknowledge(Messages<?> messages) Acknowledge the failure to processMessages.voidpause()Stop requesting new messages from the broker untilresume()is called.receive()Receives a single message.Receive a single message.Receive a single messagevoidreconsumeLater(Message<?> message, long delayTime, TimeUnit unit) reconsumeLater the consumption ofMessages.voidreconsumeLater(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) reconsumeLater the consumption ofMessages.voidreconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) reconsumeLater the consumption ofMessages.reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) Asynchronously reconsumeLater the consumption of a single message.reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) Asynchronously reconsumeLater the consumption of a single message.reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit) Asynchronously reconsumeLater the consumption ofMessages.voidreconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) reconsumeLater the reception of all the messages in the stream up to (and including) the provided message.reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.voidRedelivers all the unacknowledged messages.voidresume()Resume requesting messages from the broker.voidseek(long timestamp) Reset the subscription associated with this consumer to a specific message publish time.voidReset the subscription associated with this consumer to a specific message ID or message publish time.voidReset the subscription associated with this consumer to a specific message id.seekAsync(long timestamp) Reset the subscription associated with this consumer to a specific message publish time.Reset the subscription associated with this consumer to a specific message ID or message publish time asynchronously.Reset the subscription associated with this consumer to a specific message id.voidUnsubscribe the consumer.Asynchronously unsubscribe the consumer.
- 
Method Details- 
getTopicString getTopic()Get a topic for the consumer.- Returns:
- topic for the consumer
 
- 
getSubscriptionString getSubscription()Get a subscription for the consumer.- Returns:
- subscription for the consumer
 
- 
unsubscribeUnsubscribe the consumer.This call blocks until the consumer is unsubscribed. Unsubscribing will the subscription to be deleted and all the data retained can potentially be deleted as well. The operation will fail when performed on a shared subscription where multiple consumers are currently connected. - Throws:
- PulsarClientException- if the operation fails
 
- 
unsubscribeAsyncCompletableFuture<Void> unsubscribeAsync()Asynchronously unsubscribe the consumer.- Returns:
- CompletableFutureto track the operation
- See Also:
 
- 
receiveReceives a single message.This calls blocks until a message is available. - Returns:
- the received message
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException.InvalidConfigurationException- if a message listener was defined in the configuration
- PulsarClientException
 
- 
receiveAsyncCompletableFuture<Message<T>> receiveAsync()Receive a single messageRetrieves a message when it will be available and completes CompletableFuturewith received message.receiveAsync()should be called subsequently once returnedCompletableFuturegets complete with received message. Else it creates backlog of receive requests in the application.The returned future can be cancelled before completion by calling .cancel(false)(CompletableFuture.cancel(boolean)) to remove it from the the backlog of receive requests. Another choice for ensuring a proper clean up of the returned future is to use the CompletableFuture.orTimeout method which is available on JDK9+. That would remove it from the backlog of receive requests if receiving exceeds the timeout.- Returns:
- CompletableFuture<- Message> will be completed when message is available
 
- 
receiveReceive a single message.Retrieves a message, waiting up to the specified wait time if necessary. - Parameters:
- timeout- 0 or less means immediate rather than infinite
- unit-
- Returns:
- the received Messageor null if no message available before timeout
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException.InvalidConfigurationException- if a message listener was defined in the configuration
- PulsarClientException
 
- 
batchReceiveBatch receiving messages.This calls blocks until has enough messages or wait timeout, more details to see BatchReceivePolicy.- Returns:
- messages
- Throws:
- PulsarClientException
- Since:
- 2.4.1
 
- 
batchReceiveAsyncCompletableFuture<Messages<T>> batchReceiveAsync()Batch receiving messages.Retrieves messages when has enough messages or wait timeout and completes CompletableFuturewith received messages.batchReceiveAsync()should be called subsequently once returnedCompletableFuturegets complete with received messages. Else it creates backlog of receive requests in the application.The returned future can be cancelled before completion by calling .cancel(false)(CompletableFuture.cancel(boolean)) to remove it from the the backlog of receive requests. Another choice for ensuring a proper clean up of the returned future is to use the CompletableFuture.orTimeout method which is available on JDK9+. That would remove it from the backlog of receive requests if receiving exceeds the timeout.- Returns:
- messages
- Throws:
- PulsarClientException
- Since:
- 2.4.1
 
- 
acknowledgeAcknowledge the consumption of a single message.- Parameters:
- message- The- Messageto be acknowledged
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException
 
- 
acknowledgeAcknowledge the consumption of a single message, identified by itsMessageId.- Parameters:
- messageId- The- MessageIdto be acknowledged
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException
 
- 
acknowledgeAcknowledge the consumption ofMessages.- Parameters:
- messages- messages
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException
 
- 
acknowledgeAcknowledge the consumption of a list of message.- Parameters:
- messageIdList-
- Throws:
- PulsarClientException
 
- 
negativeAcknowledgeAcknowledge the failure to process a single message.When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit).This call is not blocking. Example of usage: while (true) { Message<String> msg = consumer.receive(); try { // Process message... consumer.acknowledge(msg); } catch (Throwable t) { log.warn("Failed to process message"); consumer.negativeAcknowledge(msg); } }- Parameters:
- message- The- Messageto be acknowledged
 
- 
negativeAcknowledgeAcknowledge the failure to process a single message.When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit).This call is not blocking. This variation allows to pass a MessageIdrather than aMessageobject, in order to avoid keeping the payload in memory for extended amount of time- Parameters:
- messageId- The- MessageIdto be acknowledged
- See Also:
 
- 
negativeAcknowledgeAcknowledge the failure to processMessages.When messages is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit).This call is not blocking. Example of usage: while (true) { Messages<String> msgs = consumer.batchReceive(); try { // Process message... consumer.acknowledge(msgs); } catch (Throwable t) { log.warn("Failed to process message"); consumer.negativeAcknowledge(msgs); } }- Parameters:
- messages- The- Messageto be acknowledged
 
- 
reconsumeLaterreconsumeLater the consumption ofMessages.When a message is "reconsumeLater" it will be marked for redelivery after some custom delay. Example of usage: while (true) { Message<String> msg = consumer.receive(); try { // Process message... consumer.acknowledge(msg); } catch (Throwable t) { log.warn("Failed to process message"); consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS); } }- Parameters:
- message- the- Messageto be reconsumeLater
- delayTime- the amount of delay before the message will be delivered
- unit- the time unit for the delay
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException
 
- 
reconsumeLatervoid reconsumeLater(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) throws PulsarClientExceptionreconsumeLater the consumption ofMessages.When a message is "reconsumeLater" it will be marked for redelivery after some custom delay. Example of usage: while (true) { Message<String> msg = consumer.receive(); try { // Process message... consumer.acknowledge(msg); } catch (Throwable t) { log.warn("Failed to process message"); consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS); } }- Parameters:
- message- the- Messageto be reconsumeLater
- customProperties- the custom properties to be reconsumeLater
- delayTime- the amount of delay before the message will be delivered
- unit- the time unit for the delay
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException
 
- 
reconsumeLatervoid reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException reconsumeLater the consumption ofMessages.- Parameters:
- messages- the- messagesto be reconsumeLater
- delayTime- the amount of delay before the message will be delivered
- unit- the time unit for the delay
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException
 
- 
acknowledgeCumulativeAcknowledge the reception of all the messages in the stream up to (and including) the provided message.This method will block until the acknowledge has been sent to the broker. After that, the messages will not be re-delivered to this consumer. Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered. - Parameters:
- message- The- Messageto be cumulatively acknowledged
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException
 
- 
acknowledgeCumulativeAcknowledge the reception of all the messages in the stream up to (and including) the provided message.This method will block until the acknowledge has been sent to the broker. After that, the messages will not be re-delivered to this consumer. Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered. - Parameters:
- messageId- The- MessageIdto be cumulatively acknowledged
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException
 
- 
acknowledgeCumulativeAsyncAcknowledge the reception of all the messages in the stream up to (and including) the provided message with this transaction, it will store in transaction pending ack.After the transaction commit, the end of previous transaction acked message until this transaction acked message will actually ack. After the transaction abort, the end of previous transaction acked message until this transaction acked message will be redelivered to this consumer. Cumulative acknowledge with transaction only support cumulative ack and now have not support individual and cumulative ack sharing. If cumulative ack with a transaction success, we can cumulative ack messageId with the same transaction more than previous messageId. It will not be allowed to cumulative ack with a transaction different from the previous one when the previous transaction haven't commit or abort. Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - Parameters:
- messageId- The- MessageIdto be cumulatively acknowledged
- txn-- Transactionthe transaction to cumulative ack
- Returns:
- CompletableFuturethe future of the ack result
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException.TransactionConflictException- if the ack with messageId is less than the messageId in pending ack state or ack with transaction is different from the transaction in pending ack.
- PulsarClientException.NotAllowedException- broker don't support transaction
- Since:
- 2.7.0
 
- 
reconsumeLaterCumulativevoid reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException reconsumeLater the reception of all the messages in the stream up to (and including) the provided message.- Parameters:
- message- The- messageto be cumulatively reconsumeLater
- delayTime- the amount of delay before the message will be delivered
- unit- the time unit for the delay
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException
 
- 
acknowledgeAsyncAsynchronously acknowledge the consumption of a single message.- Parameters:
- message- The- Messageto be acknowledged
- Returns:
- a future that can be used to track the completion of the operation
 
- 
acknowledgeAsyncAsynchronously acknowledge the consumption of a single message.- Parameters:
- messageId- The- MessageIdto be acknowledged
- Returns:
- a future that can be used to track the completion of the operation
 
- 
acknowledgeAsyncAsynchronously acknowledge the consumption of a single message, it will store in pending ack.After the transaction commit, the message will actually ack. After the transaction abort, the message will be redelivered. - Parameters:
- messageId-- MessageIdto be individual acknowledged
- txn-- Transactionthe transaction to cumulative ack
- Returns:
- CompletableFuturethe future of the ack result
- Throws:
- PulsarClientException.AlreadyClosedException- if the consumer was already closed
- PulsarClientException.TransactionConflictException- if the ack with messageId has been acked by another transaction
- PulsarClientException.NotAllowedException- broker don't support transaction don't find batch size in consumer pending ack
- Since:
- 2.7.0
 
- 
acknowledgeAsyncAsynchronously acknowledge the consumption ofMessages.- Parameters:
- messages- The- Messagesto be acknowledged
- Returns:
- a future that can be used to track the completion of the operation
 
- 
acknowledgeAsyncAsynchronously acknowledge the consumption of a list of message.- Parameters:
- messageIdList-
- Returns:
 
- 
reconsumeLaterAsyncAsynchronously reconsumeLater the consumption of a single message.- Parameters:
- message- The- Messageto be reconsumeLater
- delayTime- the amount of delay before the message will be delivered
- unit- the time unit for the delay
- Returns:
- a future that can be used to track the completion of the operation
 
- 
reconsumeLaterAsyncCompletableFuture<Void> reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) Asynchronously reconsumeLater the consumption of a single message.- Parameters:
- message- The- Messageto be reconsumeLater
- customProperties- The custom properties to be reconsumeLater
- delayTime- the amount of delay before the message will be delivered
- unit- the time unit for the delay
- Returns:
- a future that can be used to track the completion of the operation
 
- 
reconsumeLaterAsyncAsynchronously reconsumeLater the consumption ofMessages.- Parameters:
- messages- The- Messagesto be reconsumeLater
- delayTime- the amount of delay before the message will be delivered
- unit- the time unit for the delay
- Returns:
- a future that can be used to track the completion of the operation
 
- 
acknowledgeCumulativeAsyncAsynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided message.Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - Parameters:
- message- The- Messageto be cumulatively acknowledged
- Returns:
- a future that can be used to track the completion of the operation
 
- 
acknowledgeCumulativeAsyncAsynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided message.Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - Parameters:
- messageId- The- MessageIdto be cumulatively acknowledged
- Returns:
- a future that can be used to track the completion of the operation
 
- 
reconsumeLaterCumulativeAsyncCompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.Cumulative reconsumeLater cannot be used when the consumer type is set to ConsumerShared. - Parameters:
- message- The- messageto be cumulatively reconsumeLater
- delayTime- the amount of delay before the message will be delivered
- unit- the time unit for the delay
- Returns:
- a future that can be used to track the completion of the operation
 
- 
reconsumeLaterCumulativeAsyncCompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.Cumulative reconsumeLater cannot be used when the consumer type is set to ConsumerShared. - Parameters:
- message- The- messageto be cumulatively reconsumeLater
- customProperties- The custom properties to be cumulatively reconsumeLater
- delayTime- the amount of delay before the message will be delivered
- unit- the time unit for the delay
- Returns:
- a future that can be used to track the completion of the operation
 
- 
getStatsConsumerStats getStats()Get statistics for the consumer.- numMsgsReceived : Number of messages received in the current interval
- numBytesReceived : Number of bytes received in the current interval
- numReceiveFailed : Number of messages failed to receive in the current interval
- numAcksSent : Number of acks sent in the current interval
- numAcksFailed : Number of acks failed to send in the current interval
- totalMsgsReceived : Total number of messages received
- totalBytesReceived : Total number of bytes received
- totalReceiveFailed : Total number of messages failed to receive
- totalAcksSent : Total number of acks sent
- totalAcksFailed : Total number of acks failed to sent
 - Returns:
- statistic for the consumer
 
- 
closeClose the consumer and stop the broker to push more messages.- Specified by:
- closein interface- AutoCloseable
- Specified by:
- closein interface- Closeable
- Throws:
- PulsarClientException
 
- 
closeAsyncCompletableFuture<Void> closeAsync()Asynchronously close the consumer and stop the broker to push more messages.- Returns:
- a future that can be used to track the completion of the operation
 
- 
hasReachedEndOfTopicboolean hasReachedEndOfTopic()Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.Please note that this does not simply mean that the consumer is caught up with the last message published by producers, rather the topic needs to be explicitly "terminated". 
- 
redeliverUnacknowledgedMessagesvoid redeliverUnacknowledgedMessages()Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection breaks, the messages are redelivered after reconnect.
- 
seekReset the subscription associated with this consumer to a specific message id.The message id can either be a specific message or represent the first or last messages in the topic. - MessageId.earliest: Reset the subscription on the earliest message available in the topic
- MessageId.latest: Reset the subscription on the latest message in the topic
 Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions. - Parameters:
- messageId- the message id where to reposition the subscription
- Throws:
- PulsarClientException
 
- 
seekReset the subscription associated with this consumer to a specific message publish time.- Parameters:
- timestamp- the message publish time where to reposition the subscription
- Throws:
- PulsarClientException
 
- 
seekReset the subscription associated with this consumer to a specific message ID or message publish time.The Function input is topic+partition. It returns only timestamp or MessageId. The return value is the seek position/timestamp of the current partition. Exception is thrown if other object types are returned. If returns null, the current partition will not do any processing. Exception in a partition may affect other partitions. - Parameters:
- function-
- Throws:
- PulsarClientException
 
- 
seekAsyncReset the subscription associated with this consumer to a specific message ID or message publish time asynchronously.The Function input is topic+partition. It returns only timestamp or MessageId. The return value is the seek position/timestamp of the current partition. Exception is thrown if other object types are returned. If returns null, the current partition will not do any processing. Exception in a partition may affect other partitions. - Parameters:
- function-
- Returns:
 
- 
seekAsyncReset the subscription associated with this consumer to a specific message id.The message id can either be a specific message or represent the first or last messages in the topic. - MessageId.earliest: Reset the subscription on the earliest message available in the topic
- MessageId.latest: Reset the subscription on the latest message in the topic
 Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions. - Parameters:
- messageId- the message id where to reposition the subscription
- Returns:
- a future to track the completion of the seek operation
 
- 
seekAsyncReset the subscription associated with this consumer to a specific message publish time.- Parameters:
- timestamp- the message publish time where to reposition the subscription
- Returns:
- a future to track the completion of the seek operation
 
- 
getLastMessageIdGet the last message id available for consume.- Returns:
- the last message id.
- Throws:
- PulsarClientException
 
- 
getLastMessageIdAsyncCompletableFuture<MessageId> getLastMessageIdAsync()Get the last message id available for consume.- Returns:
- a future that can be used to track the completion of the operation.
 
- 
isConnectedboolean isConnected()- Returns:
- Whether the consumer is connected to the broker
 
- 
getConsumerNameString getConsumerName()Get the name of consumer.- Returns:
- consumer name.
 
- 
pausevoid pause()
- 
resumevoid resume()Resume requesting messages from the broker.
- 
getLastDisconnectedTimestamplong getLastDisconnectedTimestamp()- Returns:
- The last disconnected timestamp of the consumer
 
 
-