Skip to main content

Partitioned topics

By default, Pulsar topics are served by a single broker. Using only a single broker limits a topic's maximum throughput. Partitioned topics are a special type of topic that can span multiple brokers and thus allow for much higher throughput. For an explanation of how partitioned topics work, see the Partitioned Topics concepts.

You can publish to partitioned topics using Pulsar client libraries and you can create and manage partitioned topics using Pulsar admin API.

Publish to partitioned topics

When publishing to partitioned topics, you do not need to explicitly specify a routing mode when you create a new producer. If you do not specify a routing mode, the round robin route mode is used. Take Java as an example.

Publishing messages to partitioned topics in the Java client works much like publishing to normal topics. The difference is that you need to specify either one of the currently available message routers or a custom router.

Routing mode

You can specify the routing mode in the ProducerConfiguration object that you use to configure your producer. Three options are available:

  • SinglePartition
  • RoundRobinPartition
  • CustomPartition

The following is an example:


String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-tenant/my-namespace/my-topic";

PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("Partitioned topic message".getBytes());

Custom message router

To use a custom message router, you need to provide an implementation of the MessageRouter interface, which has just one choosePartition method:


public interface MessageRouter extends Serializable {
int choosePartition(Message msg);
}

The following router routes every message to partition 10:


public class AlwaysTenRouter implements MessageRouter {
public int choosePartition(Message msg) {
return 10;
}
}

With that implementation in hand, you can send


String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";

PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.messageRouter(new AlwaysTenRouter())
.create();
producer.send("Partitioned topic message".getBytes());

How to choose partitions when using a key

If a message has a key, it supersedes the round robin routing policy. The following example illustrates how to choose partition when you use a key.


// If the message has a key, it supersedes the round robin routing policy
if (msg.hasKey()) {
return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
}

if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.
long currentMs = clock.millis();
return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());
} else {
return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
}

Manage partitioned topics

You can use Pulsar admin API to create and manage partitioned topics.