class documentation

class Consumer: (source)

View In Hierarchy

Pulsar consumer.

Examples

import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', 'my-subscription')
while True:
    msg = consumer.receive()
    try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        consumer.acknowledge(msg)
    except Exception:
        consumer.negative_acknowledge(msg)
client.close()
Method acknowledge Acknowledge the reception of a single message.
Method acknowledge_cumulative Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
Method batch_receive Batch receiving messages.
Method close Close the consumer.
Method get_last_message_id Get the last message id.
Method is_connected Check if the consumer is connected or not.
Method negative_acknowledge Acknowledge the failure to process a single message.
Method pause_message_listener Pause receiving messages via the message_listener until resume_message_listener() is called.
Method receive Receive a single message.
Method redeliver_unacknowledged_messages Redelivers all the unacknowledged messages. In failover mode, the request is ignored if the consumer is not active for the given topic. In shared mode, the consumer's messages to be redelivered are distributed across all the connected consumers...
Method resume_message_listener Resume receiving the messages via the message listener. Asynchronously receive all the messages enqueued from the time pause_message_listener() was called.
Method seek Reset the subscription associated with this consumer to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. ...
Method subscription_name Return the subscription name.
Method topic Return the topic this consumer is subscribed to.
Method unsubscribe Unsubscribe the current consumer from the topic.
def acknowledge(self, message): (source)

Acknowledge the reception of a single message.

This method will block until an acknowledgement is sent to the broker. After that, the message will not be re-delivered to this consumer.

Parameters
messageThe received message or message id.
def acknowledge_cumulative(self, message): (source)

Acknowledge the reception of all the messages in the stream up to (and including) the provided message.

This method will block until an acknowledgement is sent to the broker. After that, the messages will not be re-delivered to this consumer.

Parameters
messageThe received message or message id.
def batch_receive(self): (source)

Batch receiving messages.

This calls blocks until has enough messages or wait timeout, more details to see {@link BatchReceivePolicy}.

def close(self): (source)

Close the consumer.

def get_last_message_id(self): (source)

Get the last message id.

def is_connected(self): (source)

Check if the consumer is connected or not.

def negative_acknowledge(self, message): (source)

Acknowledge the failure to process a single message.

When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.

This call is not blocking.

Parameters
messageThe received message or message id.
def pause_message_listener(self): (source)

Pause receiving messages via the message_listener until resume_message_listener() is called.

def receive(self, timeout_millis=None): (source)

Receive a single message.

If a message is not immediately available, this method will block until a new message is available.

Parameters
timeout_millis:int, optionalIf specified, the receiver will raise an exception if a message is not available within the timeout.
def redeliver_unacknowledged_messages(self): (source)

Redelivers all the unacknowledged messages. In failover mode, the request is ignored if the consumer is not active for the given topic. In shared mode, the consumer's messages to be redelivered are distributed across all the connected consumers. This is a non-blocking call and doesn't throw an exception. In case the connection breaks, the messages are redelivered after reconnect.

def resume_message_listener(self): (source)

Resume receiving the messages via the message listener. Asynchronously receive all the messages enqueued from the time pause_message_listener() was called.

def seek(self, messageid): (source)

Reset the subscription associated with this consumer to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions.

Parameters
messageidThe message id for seek, OR an integer event time to seek to
def subscription_name(self): (source)

Return the subscription name.

def topic(self): (source)

Return the topic this consumer is subscribed to.

def unsubscribe(self): (source)

Unsubscribe the current consumer from the topic.

This method will block until the operation is completed. Once the consumer is unsubscribed, no more messages will be received and subsequent new messages will not be retained for this consumer.

This consumer object cannot be reused.