Interface Consumer<T>
- All Superinterfaces:
AutoCloseable,Closeable,MessageAcknowledger
All the operations on the consumer instance are thread safe.
-
Method Summary
Modifier and TypeMethodDescriptionBatch receiving messages.Batch receiving messages.voidclose()Close the consumer and stop the broker to push more messages.Asynchronously close the consumer and stop the broker to push more messages.Get the name of consumer.longDeprecated.Deprecated.UsegetLastMessageIdsAsync()} instead.Get all the last message id of the topics the consumer subscribed.The asynchronous version ofgetLastMessageIds().getStats()Get statistics for the consumer.Get a subscription for the consumer.getTopic()Get a topic for the consumer.booleanReturn true if the topic was terminated and this consumer has already consumed all the messages in the topic.booleanvoidnegativeAcknowledge(Message<?> message) Acknowledge the failure to process a single message.voidnegativeAcknowledge(MessageId messageId) Acknowledge the failure to process a single message.voidnegativeAcknowledge(Messages<?> messages) Acknowledge the failure to processMessages.voidpause()Stop requesting new messages from the broker untilresume()is called.receive()Receives a single message in blocking mode.Receive a single message.Receive a single messagevoidreconsumeLater(Message<?> message, long delayTime, TimeUnit unit) reconsumeLater the consumption ofMessages.voidreconsumeLater(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) reconsumeLater the consumption ofMessages.voidreconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) reconsumeLater the consumption ofMessages.reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) Asynchronously reconsumeLater the consumption of a single message.reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) Asynchronously reconsumeLater the consumption of a single message.reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit) Asynchronously reconsumeLater the consumption ofMessages.voidreconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) reconsumeLater the reception of all the messages in the stream up to (and including) the provided message.reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.voidRedelivers all the unacknowledged messages.voidresume()Resume requesting messages from the broker.voidseek(long timestamp) Reset the subscription associated with this consumer to a specific message publish time.voidReset the subscription associated with this consumer to a specific message ID or message publish time.voidReset the subscription associated with this consumer to a specific message id.seekAsync(long timestamp) Reset the subscription associated with this consumer to a specific message publish time.Reset the subscription associated with this consumer to a specific message ID or message publish time asynchronously.The asynchronous version ofseek(MessageId).voidUnsubscribe the consumer.Asynchronously unsubscribe the consumer.Methods inherited from interface org.apache.pulsar.client.api.MessageAcknowledger
acknowledge, acknowledge, acknowledge, acknowledge, acknowledgeAsync, acknowledgeAsync, acknowledgeAsync, acknowledgeAsync, acknowledgeAsync, acknowledgeAsync, acknowledgeAsync, acknowledgeCumulative, acknowledgeCumulative, acknowledgeCumulativeAsync, acknowledgeCumulativeAsync, acknowledgeCumulativeAsync
-
Method Details
-
getTopic
String getTopic()Get a topic for the consumer.- Returns:
- topic for the consumer
-
getSubscription
String getSubscription()Get a subscription for the consumer.- Returns:
- subscription for the consumer
-
unsubscribe
Unsubscribe the consumer.This call blocks until the consumer is unsubscribed.
Unsubscribing will the subscription to be deleted and all the data retained can potentially be deleted as well.
The operation will fail when performed on a shared subscription where multiple consumers are currently connected.
- Throws:
PulsarClientException- if the operation fails
-
unsubscribeAsync
CompletableFuture<Void> unsubscribeAsync()Asynchronously unsubscribe the consumer.- Returns:
CompletableFutureto track the operation- See Also:
-
receive
Receives a single message in blocking mode.This method blocks until a message is available or the consumer is closed.
Behavior when interrupted:
- If the thread is interrupted while waiting: returns null and resets the interrupted flag
- If the consumer is closed while waiting: throws
PulsarClientExceptionwith the causeInterruptedException("Queue is terminated")
- Returns:
- the received message, or null if the thread was interrupted
- Throws:
PulsarClientException- if the consumer is closed while waiting for a message. The exception will contain anInterruptedExceptionwith the message "Queue is terminated" as its cause.PulsarClientException.AlreadyClosedException- if the consumer was already closed before this method was calledPulsarClientException.InvalidConfigurationException- if a message listener was defined in the configuration
-
receiveAsync
CompletableFuture<Message<T>> receiveAsync()Receive a single messageRetrieves a message when it will be available and completes
CompletableFuturewith received message.receiveAsync()should be called subsequently once returnedCompletableFuturegets complete with received message. Else it creates backlog of receive requests in the application.The returned future can be cancelled before completion by calling
.cancel(false)(CompletableFuture.cancel(boolean)) to remove it from the the backlog of receive requests. Another choice for ensuring a proper clean up of the returned future is to use the CompletableFuture.orTimeout method which is available on JDK9+. That would remove it from the backlog of receive requests if receiving exceeds the timeout.- Returns:
CompletableFuture<Message> will be completed when message is available
-
receive
Receive a single message.Retrieves a message, waiting up to the specified wait time if necessary.
If consumer closes during wait: returns null immediately.
- Parameters:
timeout- 0 or less means immediate rather than infiniteunit-- Returns:
- the received
Messageor null if no message available before timeout - Throws:
PulsarClientException.AlreadyClosedException- if the consumer was already closedPulsarClientException.InvalidConfigurationException- if a message listener was defined in the configurationPulsarClientException
-
batchReceive
Batch receiving messages.This calls blocks until has enough messages or wait timeout, more details to see
BatchReceivePolicy.- Returns:
- messages
- Throws:
PulsarClientException- Since:
- 2.4.1
-
batchReceiveAsync
CompletableFuture<Messages<T>> batchReceiveAsync()Batch receiving messages.Retrieves messages when has enough messages or wait timeout and completes
CompletableFuturewith received messages.batchReceiveAsync()should be called subsequently once returnedCompletableFuturegets complete with received messages. Else it creates backlog of receive requests in the application.The returned future can be cancelled before completion by calling
.cancel(false)(CompletableFuture.cancel(boolean)) to remove it from the the backlog of receive requests. Another choice for ensuring a proper clean up of the returned future is to use the CompletableFuture.orTimeout method which is available on JDK9+. That would remove it from the backlog of receive requests if receiving exceeds the timeout.- Returns:
- messages
- Throws:
PulsarClientException- Since:
- 2.4.1
-
negativeAcknowledge
Acknowledge the failure to process a single message.When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with
ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit).This call is not blocking.
Example of usage:
while (true) { Message<String> msg = consumer.receive(); try { // Process message... consumer.acknowledge(msg); } catch (Throwable t) { log.warn("Failed to process message"); consumer.negativeAcknowledge(msg); } }- Parameters:
message- TheMessageto be acknowledged
-
negativeAcknowledge
Acknowledge the failure to process a single message.When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with
ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit).This call is not blocking.
This variation allows to pass a
MessageIdrather than aMessageobject, in order to avoid keeping the payload in memory for extended amount of time- Parameters:
messageId- TheMessageIdto be acknowledged- See Also:
-
negativeAcknowledge
Acknowledge the failure to processMessages.When messages is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with
ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit).This call is not blocking.
Example of usage:
while (true) { Messages<String> msgs = consumer.batchReceive(); try { // Process message... consumer.acknowledge(msgs); } catch (Throwable t) { log.warn("Failed to process message"); consumer.negativeAcknowledge(msgs); } }- Parameters:
messages- TheMessageto be acknowledged
-
reconsumeLater
reconsumeLater the consumption ofMessages.When a message is "reconsumeLater" it will be marked for redelivery after some custom delay.
Example of usage:
while (true) { Message<String> msg = consumer.receive(); try { // Process message... consumer.acknowledge(msg); } catch (Throwable t) { log.warn("Failed to process message"); consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS); } }- Parameters:
message- theMessageto be reconsumeLaterdelayTime- the amount of delay before the message will be deliveredunit- the time unit for the delay- Throws:
PulsarClientException.AlreadyClosedException- if the consumer was already closedPulsarClientException
-
reconsumeLater
void reconsumeLater(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) throws PulsarClientExceptionreconsumeLater the consumption ofMessages.When a message is "reconsumeLater" it will be marked for redelivery after some custom delay.
Example of usage:
while (true) { Message<String> msg = consumer.receive(); try { // Process message... consumer.acknowledge(msg); } catch (Throwable t) { log.warn("Failed to process message"); consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS); } }- Parameters:
message- theMessageto be reconsumeLatercustomProperties- the custom properties to be reconsumeLaterdelayTime- the amount of delay before the message will be deliveredunit- the time unit for the delay- Throws:
PulsarClientException.AlreadyClosedException- if the consumer was already closedPulsarClientException
-
reconsumeLater
void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException reconsumeLater the consumption ofMessages.- Parameters:
messages- themessagesto be reconsumeLaterdelayTime- the amount of delay before the message will be deliveredunit- the time unit for the delay- Throws:
PulsarClientException.AlreadyClosedException- if the consumer was already closedPulsarClientException
-
reconsumeLaterCumulative
void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException reconsumeLater the reception of all the messages in the stream up to (and including) the provided message.- Parameters:
message- Themessageto be cumulatively reconsumeLaterdelayTime- the amount of delay before the message will be deliveredunit- the time unit for the delay- Throws:
PulsarClientException.AlreadyClosedException- if the consumer was already closedPulsarClientException
-
reconsumeLaterAsync
Asynchronously reconsumeLater the consumption of a single message.- Parameters:
message- TheMessageto be reconsumeLaterdelayTime- the amount of delay before the message will be deliveredunit- the time unit for the delay- Returns:
- a future that can be used to track the completion of the operation
-
reconsumeLaterAsync
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) Asynchronously reconsumeLater the consumption of a single message.- Parameters:
message- TheMessageto be reconsumeLatercustomProperties- The custom properties to be reconsumeLaterdelayTime- the amount of delay before the message will be deliveredunit- the time unit for the delay- Returns:
- a future that can be used to track the completion of the operation
-
reconsumeLaterAsync
Asynchronously reconsumeLater the consumption ofMessages.- Parameters:
messages- TheMessagesto be reconsumeLaterdelayTime- the amount of delay before the message will be deliveredunit- the time unit for the delay- Returns:
- a future that can be used to track the completion of the operation
-
reconsumeLaterCumulativeAsync
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.Cumulative reconsumeLater cannot be used when the consumer type is set to ConsumerShared.
- Parameters:
message- Themessageto be cumulatively reconsumeLaterdelayTime- the amount of delay before the message will be deliveredunit- the time unit for the delay- Returns:
- a future that can be used to track the completion of the operation
-
reconsumeLaterCumulativeAsync
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.Cumulative reconsumeLater cannot be used when the consumer type is set to ConsumerShared.
- Parameters:
message- Themessageto be cumulatively reconsumeLatercustomProperties- The custom properties to be cumulatively reconsumeLaterdelayTime- the amount of delay before the message will be deliveredunit- the time unit for the delay- Returns:
- a future that can be used to track the completion of the operation
-
getStats
ConsumerStats getStats()Get statistics for the consumer.- numMsgsReceived : Number of messages received in the current interval
- numBytesReceived : Number of bytes received in the current interval
- numReceiveFailed : Number of messages failed to receive in the current interval
- numAcksSent : Number of acks sent in the current interval
- numAcksFailed : Number of acks failed to send in the current interval
- totalMsgsReceived : Total number of messages received
- totalBytesReceived : Total number of bytes received
- totalReceiveFailed : Total number of messages failed to receive
- totalAcksSent : Total number of acks sent
- totalAcksFailed : Total number of acks failed to sent
- Returns:
- statistic for the consumer
-
close
Close the consumer and stop the broker to push more messages.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable- Throws:
PulsarClientException
-
closeAsync
CompletableFuture<Void> closeAsync()Asynchronously close the consumer and stop the broker to push more messages.- Returns:
- a future that can be used to track the completion of the operation
-
hasReachedEndOfTopic
boolean hasReachedEndOfTopic()Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.Please note that this does not simply mean that the consumer is caught up with the last message published by producers, rather the topic needs to be explicitly "terminated".
-
redeliverUnacknowledgedMessages
void redeliverUnacknowledgedMessages()Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection breaks, the messages are redelivered after reconnect. -
seek
Reset the subscription associated with this consumer to a specific message id.The message id can either be a specific message or represent the first or last messages in the topic.
MessageId.earliest: Reset the subscription on the earliest message available in the topicMessageId.latest: Reset the subscription on the latest message in the topic
This effectively resets the acknowledgement state of the subscription: all messages up to and including
messageIdwill be marked as acknowledged and the rest unacknowledged.Note: For multi-topics consumer, if `messageId` is a
TopicMessageId, the seek operation will happen on the owner topic of the message, which is returned byTopicMessageId.getOwnerTopic(). Otherwise, you can only seek to the earliest or latest message for all topics subscribed.- Parameters:
messageId- the message id where to reposition the subscription- Throws:
PulsarClientException
-
seek
Reset the subscription associated with this consumer to a specific message publish time.- Parameters:
timestamp- the message publish time where to reposition the subscription The timestamp format should be Unix time in milliseconds.- Throws:
PulsarClientException
-
seek
Reset the subscription associated with this consumer to a specific message ID or message publish time.The Function input is topic+partition. It returns only timestamp or MessageId.
The return value is the seek position/timestamp of the current partition. Exception is thrown if other object types are returned.
If returns null, the current partition will not do any processing. Exception in a partition may affect other partitions.
- Parameters:
function-- Throws:
PulsarClientException
-
seekAsync
Reset the subscription associated with this consumer to a specific message ID or message publish time asynchronously.The Function input is topic+partition. It returns only timestamp or MessageId.
The return value is the seek position/timestamp of the current partition. Exception is thrown if other object types are returned.
If returns null, the current partition will not do any processing. Exception in a partition may affect other partitions.
- Parameters:
function-- Returns:
-
seekAsync
The asynchronous version ofseek(MessageId). -
seekAsync
Reset the subscription associated with this consumer to a specific message publish time.- Parameters:
timestamp- the message publish time where to reposition the subscription The timestamp format should be Unix time in milliseconds.- Returns:
- a future to track the completion of the seek operation
-
getLastMessageId
Deprecated.UsegetLastMessageIds()instead.Get the last message id available for consume.- Returns:
- the last message id.
- Throws:
PulsarClientException
-
getLastMessageIdAsync
Deprecated.UsegetLastMessageIdsAsync()} instead.Get the last message id available for consume.- Returns:
- a future that can be used to track the completion of the operation.
-
getLastMessageIds
Get all the last message id of the topics the consumer subscribed.- Returns:
- the list of TopicMessageId instances of all the topics that the consumer subscribed
- Throws:
PulsarClientException- if failed to get last message id.
-
getLastMessageIdsAsync
CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync()The asynchronous version ofgetLastMessageIds(). -
isConnected
boolean isConnected()- Returns:
- Whether the consumer is connected to the broker
-
getConsumerName
String getConsumerName()Get the name of consumer.- Returns:
- consumer name.
-
pause
void pause() -
resume
void resume()Resume requesting messages from the broker. -
getLastDisconnectedTimestamp
long getLastDisconnectedTimestamp()- Returns:
- The last disconnected timestamp of the consumer
-
getLastMessageIds()instead.