Interface Consumer<T>

All Superinterfaces:
AutoCloseable, Closeable, MessageAcknowledger

@Public @Stable public interface Consumer<T> extends Closeable, MessageAcknowledger
An interface that abstracts behavior of Pulsar's consumer.

All the operations on the consumer instance are thread safe.

  • Method Details

    • getTopic

      String getTopic()
      Get a topic for the consumer.
      Returns:
      topic for the consumer
    • getSubscription

      String getSubscription()
      Get a subscription for the consumer.
      Returns:
      subscription for the consumer
    • unsubscribe

      void unsubscribe() throws PulsarClientException
      Unsubscribe the consumer.

      This call blocks until the consumer is unsubscribed.

      Unsubscribing will the subscription to be deleted and all the data retained can potentially be deleted as well.

      The operation will fail when performed on a shared subscription where multiple consumers are currently connected.

      Throws:
      PulsarClientException - if the operation fails
    • unsubscribeAsync

      CompletableFuture<Void> unsubscribeAsync()
      Asynchronously unsubscribe the consumer.
      Returns:
      CompletableFuture to track the operation
      See Also:
    • unsubscribe

      void unsubscribe(boolean force) throws PulsarClientException
      Unsubscribe the consumer.

      This call blocks until the consumer is unsubscribed.

      Unsubscribing will the subscription to be deleted and all the data retained can potentially be deleted as well.

      The operation will fail when performed on a shared subscription where multiple consumers are currently connected.

      Parameters:
      force - forcefully unsubscribe by disconnecting connected consumers.
      Throws:
      PulsarClientException - if the operation fails
    • unsubscribeAsync

      CompletableFuture<Void> unsubscribeAsync(boolean force)
      Asynchronously unsubscribe the consumer.
      Parameters:
      force - forcefully unsubscribe by disconnecting connected consumers.
      Returns:
      CompletableFuture to track the operation
      See Also:
    • receive

      Message<T> receive() throws PulsarClientException
      Receives a single message.

      This calls blocks until a message is available.

      When thread is Interrupted, return a null value and reset interrupted flag.

      Returns:
      the received message
      Throws:
      PulsarClientException.AlreadyClosedException - if the consumer was already closed
      PulsarClientException.InvalidConfigurationException - if a message listener was defined in the configuration
      PulsarClientException
    • receiveAsync

      CompletableFuture<Message<T>> receiveAsync()
      Receive a single message

      Retrieves a message when it will be available and completes CompletableFuture with received message.

      receiveAsync() should be called subsequently once returned CompletableFuture gets complete with received message. Else it creates backlog of receive requests in the application.

      The returned future can be cancelled before completion by calling .cancel(false) (CompletableFuture.cancel(boolean)) to remove it from the the backlog of receive requests. Another choice for ensuring a proper clean up of the returned future is to use the CompletableFuture.orTimeout method which is available on JDK9+. That would remove it from the backlog of receive requests if receiving exceeds the timeout.

      Returns:
      CompletableFuture<Message> will be completed when message is available
    • receive

      Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException
      Receive a single message.

      Retrieves a message, waiting up to the specified wait time if necessary.

      Parameters:
      timeout - 0 or less means immediate rather than infinite
      unit -
      Returns:
      the received Message or null if no message available before timeout
      Throws:
      PulsarClientException.AlreadyClosedException - if the consumer was already closed
      PulsarClientException.InvalidConfigurationException - if a message listener was defined in the configuration
      PulsarClientException
    • batchReceive

      Messages<T> batchReceive() throws PulsarClientException
      Batch receiving messages.

      This calls blocks until has enough messages or wait timeout, more details to see BatchReceivePolicy.

      Returns:
      messages
      Throws:
      PulsarClientException
      Since:
      2.4.1
    • batchReceiveAsync

      CompletableFuture<Messages<T>> batchReceiveAsync()
      Batch receiving messages.

      Retrieves messages when has enough messages or wait timeout and completes CompletableFuture with received messages.

      batchReceiveAsync() should be called subsequently once returned CompletableFuture gets complete with received messages. Else it creates backlog of receive requests in the application.

      The returned future can be cancelled before completion by calling .cancel(false) (CompletableFuture.cancel(boolean)) to remove it from the the backlog of receive requests. Another choice for ensuring a proper clean up of the returned future is to use the CompletableFuture.orTimeout method which is available on JDK9+. That would remove it from the backlog of receive requests if receiving exceeds the timeout.

      Returns:
      messages
      Throws:
      PulsarClientException
      Since:
      2.4.1
    • negativeAcknowledge

      void negativeAcknowledge(Message<?> message)
      Acknowledge the failure to process a single message.

      When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit).

      This call is not blocking.

      Example of usage:

      
       while (true) {
           Message<String> msg = consumer.receive();
      
           try {
                // Process message...
      
                consumer.acknowledge(msg);
           } catch (Throwable t) {
                log.warn("Failed to process message");
                consumer.negativeAcknowledge(msg);
           }
       }
       
      Parameters:
      message - The Message to be acknowledged
    • negativeAcknowledge

      void negativeAcknowledge(MessageId messageId)
      Acknowledge the failure to process a single message.

      When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit).

      This call is not blocking.

      This variation allows to pass a MessageId rather than a Message object, in order to avoid keeping the payload in memory for extended amount of time

      Parameters:
      messageId - The MessageId to be acknowledged
      See Also:
    • negativeAcknowledge

      void negativeAcknowledge(Messages<?> messages)
      Acknowledge the failure to process Messages.

      When messages is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit).

      This call is not blocking.

      Example of usage:

      
       while (true) {
           Messages<String> msgs = consumer.batchReceive();
      
           try {
                // Process message...
      
                consumer.acknowledge(msgs);
           } catch (Throwable t) {
                log.warn("Failed to process message");
                consumer.negativeAcknowledge(msgs);
           }
       }
       
      Parameters:
      messages - The Message to be acknowledged
    • reconsumeLater

      void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException
      reconsumeLater the consumption of Messages.

      When a message is "reconsumeLater" it will be marked for redelivery after some custom delay.

      Example of usage:

      
       while (true) {
           Message<String> msg = consumer.receive();
      
           try {
                // Process message...
      
                consumer.acknowledge(msg);
           } catch (Throwable t) {
                log.warn("Failed to process message");
                consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS);
           }
       }
       
      Parameters:
      message - the Message to be reconsumeLater
      delayTime - the amount of delay before the message will be delivered
      unit - the time unit for the delay
      Throws:
      PulsarClientException.AlreadyClosedException - if the consumer was already closed
      PulsarClientException
    • reconsumeLater

      void reconsumeLater(Message<?> message, Map<String,String> customProperties, long delayTime, TimeUnit unit) throws PulsarClientException
      reconsumeLater the consumption of Messages.

      When a message is "reconsumeLater" it will be marked for redelivery after some custom delay.

      Example of usage:

      
       while (true) {
           Message<String> msg = consumer.receive();
      
           try {
                // Process message...
      
                consumer.acknowledge(msg);
           } catch (Throwable t) {
                log.warn("Failed to process message");
                consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS);
           }
       }
       
      Parameters:
      message - the Message to be reconsumeLater
      customProperties - the custom properties to be reconsumeLater
      delayTime - the amount of delay before the message will be delivered
      unit - the time unit for the delay
      Throws:
      PulsarClientException.AlreadyClosedException - if the consumer was already closed
      PulsarClientException
    • reconsumeLater

      void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException
      reconsumeLater the consumption of Messages.
      Parameters:
      messages - the messages to be reconsumeLater
      delayTime - the amount of delay before the message will be delivered
      unit - the time unit for the delay
      Throws:
      PulsarClientException.AlreadyClosedException - if the consumer was already closed
      PulsarClientException
    • reconsumeLaterCumulative

      void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException
      reconsumeLater the reception of all the messages in the stream up to (and including) the provided message.
      Parameters:
      message - The message to be cumulatively reconsumeLater
      delayTime - the amount of delay before the message will be delivered
      unit - the time unit for the delay
      Throws:
      PulsarClientException.AlreadyClosedException - if the consumer was already closed
      PulsarClientException
    • reconsumeLaterAsync

      CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit)
      Asynchronously reconsumeLater the consumption of a single message.
      Parameters:
      message - The Message to be reconsumeLater
      delayTime - the amount of delay before the message will be delivered
      unit - the time unit for the delay
      Returns:
      a future that can be used to track the completion of the operation
    • reconsumeLaterAsync

      CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, Map<String,String> customProperties, long delayTime, TimeUnit unit)
      Asynchronously reconsumeLater the consumption of a single message.
      Parameters:
      message - The Message to be reconsumeLater
      customProperties - The custom properties to be reconsumeLater
      delayTime - the amount of delay before the message will be delivered
      unit - the time unit for the delay
      Returns:
      a future that can be used to track the completion of the operation
    • reconsumeLaterAsync

      CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit)
      Asynchronously reconsumeLater the consumption of Messages.
      Parameters:
      messages - The Messages to be reconsumeLater
      delayTime - the amount of delay before the message will be delivered
      unit - the time unit for the delay
      Returns:
      a future that can be used to track the completion of the operation
    • reconsumeLaterCumulativeAsync

      CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit)
      Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.

      Cumulative reconsumeLater cannot be used when the consumer type is set to ConsumerShared.

      Parameters:
      message - The message to be cumulatively reconsumeLater
      delayTime - the amount of delay before the message will be delivered
      unit - the time unit for the delay
      Returns:
      a future that can be used to track the completion of the operation
    • reconsumeLaterCumulativeAsync

      CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String,String> customProperties, long delayTime, TimeUnit unit)
      Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.

      Cumulative reconsumeLater cannot be used when the consumer type is set to ConsumerShared.

      Parameters:
      message - The message to be cumulatively reconsumeLater
      customProperties - The custom properties to be cumulatively reconsumeLater
      delayTime - the amount of delay before the message will be delivered
      unit - the time unit for the delay
      Returns:
      a future that can be used to track the completion of the operation
    • getStats

      ConsumerStats getStats()
      Get statistics for the consumer.
      • numMsgsReceived : Number of messages received in the current interval
      • numBytesReceived : Number of bytes received in the current interval
      • numReceiveFailed : Number of messages failed to receive in the current interval
      • numAcksSent : Number of acks sent in the current interval
      • numAcksFailed : Number of acks failed to send in the current interval
      • totalMsgsReceived : Total number of messages received
      • totalBytesReceived : Total number of bytes received
      • totalReceiveFailed : Total number of messages failed to receive
      • totalAcksSent : Total number of acks sent
      • totalAcksFailed : Total number of acks failed to sent
      Returns:
      statistic for the consumer
    • close

      void close() throws PulsarClientException
      Close the consumer and stop the broker to push more messages.
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
      Throws:
      PulsarClientException
    • closeAsync

      CompletableFuture<Void> closeAsync()
      Asynchronously close the consumer and stop the broker to push more messages.
      Returns:
      a future that can be used to track the completion of the operation
    • hasReachedEndOfTopic

      boolean hasReachedEndOfTopic()
      Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.

      Please note that this does not simply mean that the consumer is caught up with the last message published by producers, rather the topic needs to be explicitly "terminated".

    • redeliverUnacknowledgedMessages

      void redeliverUnacknowledgedMessages()
      Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection breaks, the messages are redelivered after reconnect.
    • seek

      void seek(MessageId messageId) throws PulsarClientException
      Reset the subscription associated with this consumer to a specific message id.

      If there is already a seek operation in progress, the method will log a warning and return a future completed exceptionally.

      The message id can either be a specific message or represent the first or last messages in the topic.

      • MessageId.earliest : Reset the subscription on the earliest message available in the topic
      • MessageId.latest : Reset the subscription on the latest message in the topic

      This effectively resets the acknowledgement state of the subscription: all messages up to and including messageId will be marked as acknowledged and the rest unacknowledged.

      Note: For multi-topics consumer, if `messageId` is a TopicMessageId, the seek operation will happen on the owner topic of the message, which is returned by TopicMessageId.getOwnerTopic(). Otherwise, you can only seek to the earliest or latest message for all topics subscribed.

      Parameters:
      messageId - the message id where to reposition the subscription
      Throws:
      PulsarClientException
    • seek

      void seek(long timestamp) throws PulsarClientException
      Reset the subscription associated with this consumer to a specific message publish time.

      If there is already a seek operation in progress, the method will log a warning and return a future completed exceptionally.

      Parameters:
      timestamp - the message publish time where to reposition the subscription The timestamp format should be Unix time in milliseconds.
      Throws:
      PulsarClientException
    • seek

      void seek(Function<String,Object> function) throws PulsarClientException
      Reset the subscription associated with this consumer to a specific message ID or message publish time.

      If there is already a seek operation in progress, the method will log a warning and return a future completed exceptionally.

      The Function input is topic+partition. It returns only timestamp or MessageId.

      The return value is the seek position/timestamp of the current partition. Exception is thrown if other object types are returned.

      If returns null, the current partition will not do any processing. Exception in a partition may affect other partitions.

      Parameters:
      function -
      Throws:
      PulsarClientException
    • seekAsync

      CompletableFuture<Void> seekAsync(Function<String,Object> function)
      Reset the subscription associated with this consumer to a specific message ID or message publish time asynchronously.

      The Function input is topic+partition. It returns only timestamp or MessageId.

      The return value is the seek position/timestamp of the current partition. Exception is thrown if other object types are returned.

      If returns null, the current partition will not do any processing. Exception in a partition may affect other partitions.

      Parameters:
      function -
      Returns:
    • seekAsync

      CompletableFuture<Void> seekAsync(MessageId messageId)
      The asynchronous version of seek(MessageId).

      If there is already a seek operation in progress, the method will log a warning and return a future completed exceptionally.

    • seekAsync

      CompletableFuture<Void> seekAsync(long timestamp)
      Reset the subscription associated with this consumer to a specific message publish time.

      If there is already a seek operation in progress, the method will log a warning and return a future completed exceptionally.

      Parameters:
      timestamp - the message publish time where to reposition the subscription The timestamp format should be Unix time in milliseconds.
      Returns:
      a future to track the completion of the seek operation
    • getLastMessageId

      @Deprecated MessageId getLastMessageId() throws PulsarClientException
      Deprecated.
      Use getLastMessageIds() instead.
      Get the last message id available for consume.
      Returns:
      the last message id.
      Throws:
      PulsarClientException
    • getLastMessageIdAsync

      @Deprecated CompletableFuture<MessageId> getLastMessageIdAsync()
      Deprecated.
      Get the last message id available for consume.
      Returns:
      a future that can be used to track the completion of the operation.
    • getLastMessageIds

      List<TopicMessageId> getLastMessageIds() throws PulsarClientException
      Get all the last message id of the topics the consumer subscribed.
      Returns:
      the list of TopicMessageId instances of all the topics that the consumer subscribed
      Throws:
      PulsarClientException - if failed to get last message id.
    • getLastMessageIdsAsync

      CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync()
      The asynchronous version of getLastMessageIds().
    • isConnected

      boolean isConnected()
      Returns:
      Whether the consumer is connected to the broker
    • getConsumerName

      String getConsumerName()
      Get the name of consumer.
      Returns:
      consumer name.
    • pause

      void pause()
      Stop requesting new messages from the broker until resume() is called. Note that this might cause receive() to block until resume() is called and new messages are pushed by the broker.
    • resume

      void resume()
      Resume requesting messages from the broker.
    • getLastDisconnectedTimestamp

      long getLastDisconnectedTimestamp()
      Returns:
      The last disconnected timestamp of the consumer