Skip to main content
Version: Next

Advanced features

You can use the following advanced features with transactions in Pulsar.

Ack batch messages

If you want to acknowledge batch messages with transactions, set acknowledgmentAtBatchIndexLevelEnabled to true in the broker.conf or standalone.conf file.

acknowledgmentAtBatchIndexLevelEnabled=true

This example enables batch messages ack in transactions in the consumer builder.

Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(transferTopic)
.subscriptionName("transaction-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true) // enable batch index acknowledgment
.subscribe();

Enable authentication

If you want to enable authentication with transactions, follow the steps below.

  1. Grant "consume" permission to the persistent://pulsar/system/transaction_coordinator_assign topic.

  2. Configure authentication in a Pulsar client.

Select transaction isolation level

To enhance the flexibility of Pulsar transactions, they support two distinct isolation levels:

  • READ_COMMITTED(default): The consumer can only consume all transactional messages that have been committed.
  • READ_UNCOMMITTED: The consumer can consume all messages, even transactional messages that have been aborted.

For different scenarios, they use different subscriptions and choose different isolation levels. One scenario might require transactions, while another might not. In general, not all subscriptions of the same topic require transaction guarantees. Some want low latency without the exact-once semantic guarantee (like a real-time monitoring system), and some must require the exactly-once guarantee (e.g., business processing systems). Users can freely choose different isolation levels according to different scenarios.

Note that this is a subscription-level configuration, and all consumers under the same subscription must be configured with the same isolation level.

In this example, the consumer builder uses the READ_UNCOMMITTED isolation level.

Consumer<String> consumer = client
.newConsumer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_UNCOMMITTED) // Adding the isolation level configuration
.subscribe();

Guarantee exactly-once semantics

If you want to guarantee exactly-once semantics with transactions, you can enable message deduplication at the broker, namespace, or topic level.