The transaction feature is primarily a server-side and protocol-level feature. You can use the transaction feature via the transaction API, which is available in Pulsar 2.8.0 or later.
To use the transaction API, you do not need any additional settings in the Pulsar client. By default, transactions is disabled.
Currently, transaction API is only available for Java clients. Support for other language clients will be added in the future releases.
This section provides an example of how to use the transaction API to send and receive messages in a Java client.
Start Pulsar 2.8.0 or later.
Change the configuration in the
//mandatory configuration, used to enable transaction coordinator
//mandtory configuration, used to create systemTopic used for transaction buffer snapshot
If you want to acknowledge batch messages in transactions, set
If you want to guarantee exactly-once semantics, you need to enable message deduplication. You can enable message deduplication at the broker level, the namespace level, or the topic level according to your needs.
Initialize transaction coordinator metadata.
The transaction coordinator can leverage the advantages of partitioned topics (such as load balance).
bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone
Transaction coordinator metadata setup success
Initialize a Pulsar client.
PulsarClient client = PulsarClient.builder()
Now you can start using the transaction API to send and receive messages. Below is an example of a
consume-process-produce application written in Java.
Let’s walk through this example step by step.
|1. Start a transaction.||The application opens a new transaction by calling PulsarClient.newTransaction. It specifics the transaction timeout as 1 minute. If the transaction is not committed within 1 minute, the transaction is automatically aborted.|
|2. Receive messages from topics.||The application creates two normal consumers to receive messages from topic input-topic-1 and input-topic-2 respectively.|
|3. Publish messages to topics with the transaction.||The application creates two producers to produce the resulting messages to the output topic output-topic-1 and output-topic-2 respectively. The application applies the processing logic and generates two output messages. The application sends those two output messages as part of the transaction opened in the first step via Producer.newMessage(Transaction).|
|4. Acknowledge the messages with the transaction.||In the same transaction, the application acknowledges the two input messages.|
|5. Commit the transaction.||The application commits the transaction by calling Transaction.commit() on the open transaction. The commit operation ensures the two input messages are marked as acknowledged and the two output messages are written successfully to the output topics.|
 Example of enabling batch messages ack in transactions in the consumer builder.
Consumer<byte> sinkConsumer = pulsarClient
.enableBatchIndexAcknowledgment(true) // enable batch index acknowledgement