class Consumer: (source)
Pulsar consumer.
Examples
import pulsar client = pulsar.Client('pulsar://localhost:6650') consumer = client.subscribe('my-topic', 'my-subscription') while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) consumer.acknowledge(msg) except Exception: consumer.negative_acknowledge(msg) client.close()
Method | acknowledge |
Acknowledge the reception of a single message. |
Method | acknowledge |
Acknowledge the reception of all the messages in the stream up to (and including) the provided message. |
Method | batch |
Batch receiving messages. |
Method | close |
Close the consumer. |
Method | get |
Get the last message id. |
Method | is |
Check if the consumer is connected or not. |
Method | negative |
Acknowledge the failure to process a single message. |
Method | pause |
Pause receiving messages via the message_listener until resume_message_listener() is called. |
Method | receive |
Receive a single message. |
Method | redeliver |
Redelivers all the unacknowledged messages. In failover mode, the request is ignored if the consumer is not active for the given topic. In shared mode, the consumer's messages to be redelivered are distributed across all the connected consumers... |
Method | resume |
Resume receiving the messages via the message listener. Asynchronously receive all the messages enqueued from the time pause_message_listener() was called. |
Method | seek |
Reset the subscription associated with this consumer to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. ... |
Method | subscription |
Return the subscription name. |
Method | topic |
Return the topic this consumer is subscribed to. |
Method | unsubscribe |
Unsubscribe the current consumer from the topic. |
Acknowledge the reception of a single message.
This method will block until an acknowledgement is sent to the broker. After that, the message will not be re-delivered to this consumer.
Parameters | |
message:Message , _pulsar.Message , _pulsar.MessageId | The received message or message id. |
Raises | |
OperationNotSupported | if message is not allowed to acknowledge |
Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
This method will block until an acknowledgement is sent to the broker. After that, the messages will not be re-delivered to this consumer.
Parameters | |
message | The received message or message id. |
Raises | |
CumulativeAcknowledgementNotAllowedError | if the consumer type is ConsumerType.KeyShared or ConsumerType.Shared |
Batch receiving messages.
This calls blocks until has enough messages or wait timeout, more details to see {@link BatchReceivePolicy}.
Acknowledge the failure to process a single message.
When a message is "negatively acked" it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
This call is not blocking.
Parameters | |
message | The received message or message id. |
Receive a single message.
If a message is not immediately available, this method will block until a new message is available.
Parameters | |
timeoutint , optional | If specified, the receiver will raise an exception if a message is not available within the timeout. |
Redelivers all the unacknowledged messages. In failover mode, the request is ignored if the consumer is not active for the given topic. In shared mode, the consumer's messages to be redelivered are distributed across all the connected consumers. This is a non-blocking call and doesn't throw an exception. In case the connection breaks, the messages are redelivered after reconnect.
Resume receiving the messages via the message listener.
Asynchronously receive all the messages enqueued from the time
pause_message_listener()
was called.
Reset the subscription associated with this consumer to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on the individual partitions.
Parameters | |
messageid | The message id for seek, OR an integer event time to seek to |