Partitioned topics

Expand message throughput by distributing load within topics

By default, Pulsar topics are served by a single broker. Using only a single broker, however, limits a topic’s maximum throughput. Partitioned topics are a special type of topic that can span multiple brokers and thus allow for much higher throughput. For an explanation of how partitioned topics work, see the Concepts section below.

You can publish to partitioned topics using Pulsar’s client libraries and you can create and manage partitioned topics using Pulsar’s admin API.

Publishing to partitioned topics

When publishing to partitioned topics, the only difference from non-partitioned topics is that you need to specify a routing mode when you create a new producer. Examples for Java are below.

Java

Publishing messages to partitioned topics in the Java client works much like publishing to normal topics. The difference is that you need to specify either one of the currently available message routers or a custom router.

Routing mode

You can specify the routing mode in the ProducerConfiguration object that you use to configure your producer. You have three options:

  • SinglePartition
  • RoundRobinPartition
  • CustomPartition

Here’s an example:

String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-property/my-cluster-my-namespace/my-topic";

PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
ProducerConfiguration config = new ProducerConfiguration();
config.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.SinglePartition);
Producer producer = client.createProducer(topic, config);
producer.send("Partitioned topic message".getBytes());

Custom message router

To use a custom message router, you need to provide an implementation of the MessageRouter interface, which has just one choosePartition method:

public interface MessageRouter extends Serializable {
    int choosePartition(Message msg);
}

Here’s a (not very useful!) router that routes every message to partition 10:

public class AlwaysTenRouter implements MessageRouter {
    public int choosePartition(Message msg) {
        return 10;
    }
}

With that implementation in hand, you can send

String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-property/my-cluster-my-namespace/my-topic";

PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
ProducerConfiguration config = new ProducerConfiguration();
config.setMessageRouter(AlwaysTenRouter);
Producer producer = client.createProducer(topic, config);
producer.send("Partitioned topic message".getBytes());

Pulsar admin setup

Each of Pulsar’s three admin interfaces—the pulsar-admin CLI tool, the Java admin API, and the REST API—requires some special setup if you have authentication enabled in your Pulsar instance.

pulsar-admin

If you have authentication enabled, you will need to provide an auth configuration to use the pulsar-admin tool. By default, the configuration for the pulsar-admin tool is found in the conf/client.conf file. Here are the available parameters:

Name Description Default
webServiceUrl The web URL for the cluster. http://localhost:8080/
brokerServiceUrl The Pulsar protocol URL for the cluster. pulsar://localhost:6650/
authPlugin The authentication plugin.
authParams The authentication parameters for the cluster, as a comma-separated string.
useTls Whether or not TLS authentication will be enforced in the cluster. false
tlsAllowInsecureConnection
tlsTrustCertsFilePath

REST API

You can find documentation for the REST API exposed by Pulsar brokers in this reference document.

Java admin client

To use the Java admin API, instantiate a PulsarAdmin object, specifying a URL for a Pulsar broker and a ClientConfiguration. Here’s a minimal example using localhost:

URL url = new URL("http://localhost:8080");
String authPluginClassName = "com.org.MyAuthPluginClass"; //Pass auth-plugin class fully-qualified name if Pulsar-security enabled
String authParams = "param1=value1";//Pass auth-param if auth-plugin class requires it
boolean useTls = false;
boolean tlsAllowInsecureConnection = false;
String tlsTrustCertsFilePath = null;

ClientConfiguration config = new ClientConfiguration();
config.setAuthentication(authPluginClassName, authParams);
config.setUseTls(useTls);
config.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
config.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);

PulsarAdmin admin = new PulsarAdmin(url, config);

Managing partitioned topics

You can use Pulsar’s admin API to create and manage partitioned topics.

In all of the instructions and commands below, the topic name structure is:

persistent://property/cluster/namespace/topic

Create

Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.

Global partitioned topics

If you’d like to create a global partitioned topic, you need to create a partitioned topic using the instructions here and specify global as the cluster in the topic name.

pulsar-admin

You can create partitioned topics using the create-partitioned-topic command and specifying the topic name as an argument and the number of partitions using the -p or --partitions flag. Here’s an example:

$ bin/pulsar-admin persistent create-partitioned-topic \
  persistent://my-property/my-cluster-my-namespace/my-topic \
  --partitions 4

REST API

PUT/admin/persistent/:property/:cluster/:namespace/:destination/partitions

More info

Java

String topicName = "persistent://my-property/my-cluster-my-namespace/my-topic";
int numPartitions = 4;
admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);

Get metadata

Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:

Field Meaning
partitions The number of partitions into which the topic is divided

pulsar-admin

You can see the see number of partitions in a partitioned topic using the get-partitioned-topic-metadata subcommand. Here’s an example:

$ pulsar-admin persistent get-partitioned-topic-metadata \
  persistent://my-property/my-cluster-my-namespace/my-topic
{
  "partitions": 4
}

REST API

GET/admin/persistent/:property/:cluster:/:namespace/:destination/partitions

More info

Java

String topicName = "persistent://my-property/my-cluster-my-namespace/my-topic";
admin.persistentTopics().getPartitionedTopicMetadata(topicName);

Update

You can update the number of partitions on an existing partitioned topic if the topic is non-global. To update, the new number of partitions must be greater than the existing number.

Decrementing the number of partitions would deleting the topic, which is not supported in Pulsar.

Already created partitioned producers and consumers can’t see newly created partitions and it requires to recreate them at application so, newly created producers and consumers can connect to newly added partitions as well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.

pulsar-admin

Partitioned topics can be deleted using the update-partitioned-topic command.

$ pulsar-admin persistent update-partitioned-topic \
  persistent://my-property/my-cluster-my-namespace/my-topic \
  --partitions 8

REST API

POST/admin/persistent/:property/:cluster/:namespace/:destination/partitions

More info

Java

admin.persistentTopics().updatePartitionedTopic(persistentTopic, numPartitions);

Delete

pulsar-admin

Partitioned topics can be deleted using the delete-partitioned-topic command, specifying the topic by name:

$ bin/pulsar-admin persistent delete-partitioned-topic \
  persistent://my-property/my-cluster-my-namespace/my-topic

REST API

DELETE/admin/persistent/:property/:cluster/:namespace/:destination/partitions

More info

Java

admin.persistentTopics().delete(persistentTopic);

List

It provides a list of persistent topics existing under a given namespace.

pulsar-admin

$ pulsar-admin persistent list prop-1/cluster-1/namespace
persistent://property/cluster/namespace/topic
persistent://property/cluster/namespace/topic

REST API

GET/admin/persistent/:property/:cluster/:namespace

More info

Java

admin.persistentTopics().getList(namespace);

Stats

It shows current statistics of a given partitioned topic. Here’s an example payload:

{
  "msgRateIn": 4641.528542257553,
  "msgThroughputIn": 44663039.74947473,
  "msgRateOut": 0,
  "msgThroughputOut": 0,
  "averageMsgSize": 1232439.816728665,
  "storageSize": 135532389160,
  "publishers": [
    {
      "msgRateIn": 57.855383881403576,
      "msgThroughputIn": 558994.7078932219,
      "averageMsgSize": 613135,
      "producerId": 0,
      "producerName": null,
      "address": null,
      "connectedSince": null
    }
  ],
  "subscriptions": {
    "my-topic_subscription": {
      "msgRateOut": 0,
      "msgThroughputOut": 0,
      "msgBacklog": 116632,
      "type": null,
      "msgRateExpired": 36.98245516804671,
      "consumers": []
    }
  },
  "replication": {}
}

The following stats are available:

Stat Description
msgRateIn The sum of all local and replication publishers’ publish rates in messages per second
msgThroughputIn Same as msgRateIn but in bytes per second instead of messages per second
msgRateOut The sum of all local and replication consumers’ dispatch rates in messages per second
msgThroughputOut Same as msgRateOut but in bytes per second instead of messages per second
averageMsgSize Average message size, in bytes, from this publisher within the last interval
storageSize The sum of the ledgers’ storage size for this topic
publishers The list of all local publishers into the topic. There can be anywhere from zero to thousands.
producerId Internal identifier for this producer on this topic
producerName Internal identifier for this producer, generated by the client library
address IP address and source port for the connection of this producer
connectedSince Timestamp this producer was created or last reconnected
subscriptions The list of all local subscriptions to the topic
my-subscription The name of this subscription (client defined)
msgBacklog The count of messages in backlog for this subscription
type This subscription type
msgRateExpired The rate at which messages were discarded instead of dispatched from this subscription due to TTL
consumers The list of connected consumers for this subscription
consumerName Internal identifier for this consumer, generated by the client library
availablePermits The number of messages this consumer has space for in the client library’s listen queue. A value of 0 means the client library’s queue is full and receive() isn’t being called. A nonzero value means this consumer is ready to be dispatched messages.
replication This section gives the stats for cross-colo replication of this topic
replicationBacklog The outbound replication backlog in messages
connected Whether the outbound replicator is connected
replicationDelayInSeconds How long the oldest message has been waiting to be sent through the connection, if connected is true
inboundConnection The IP and port of the broker in the remote cluster’s publisher connection to this broker
inboundConnectedSince The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.

pulsar-admin

partitioned-stats

$ pulsar-admin persistent partitioned-stats \
  persistent://test-property/cl1/ns1/tp1 \
  --per-partition        

REST API

GET/admin/persistent/:property/:cluster/:namespace/:destination/partitioned-stats

More info

Java

admin.persistentTopics().getStats(persistentTopic);

Internal stats

It shows detailed statistics of a topic.

Stat Description
entriesAddedCounter Messages published since this broker loaded this topic
numberOfEntries Total number of messages being tracked
totalSize Total storage size in bytes of all messages
currentLedgerEntries Count of messages written to the ledger currently open for writing
currentLedgerSize Size in bytes of messages written to ledger currently open for writing
lastLedgerCreatedTimestamp Time when last ledger was created
lastLedgerCreationFailureTimestamp time when last ledger was failed
waitingCursorsCount How many cursors are caught up and waiting for a new message to be published
pendingAddEntriesCount How many messages have (asynchronous) write requests we are waiting on completion
lastConfirmedEntry The ledgerid:entryid of the last message successfully written. If the entryid is -1, then the ledger has been opened or is currently being opened but has no entries written yet.
state The state of the cursor ledger. Open means we have a cursor ledger for saving updates of the markDeletePosition.
ledgers The ordered list of all ledgers for this topic holding its messages
cursors The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.
markDeletePosition The ack position: the last message the subscriber acknowledged receiving
readPosition The latest position of subscriber for reading message
waitingReadOp This is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published.
pendingReadOps The counter for how many outstanding read requests to the BookKeepers we have in progress
messagesConsumedCounter Number of messages this cursor has acked since this broker loaded this topic
cursorLedger The ledger being used to persistently store the current markDeletePosition
cursorLedgerLastEntry The last entryid used to persistently store the current markDeletePosition
individuallyDeletedMessages If Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position
lastLedgerSwitchTimestamp The last time the cursor ledger was rolled over
{
  "entriesAddedCounter": 20449518,
  "numberOfEntries": 3233,
  "totalSize": 331482,
  "currentLedgerEntries": 3233,
  "currentLedgerSize": 331482,
  "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
  "lastLedgerCreationFailureTimestamp": null,
  "waitingCursorsCount": 1,
  "pendingAddEntriesCount": 0,
  "lastConfirmedEntry": "324711539:3232",
  "state": "LedgerOpened",
  "ledgers": [
    {
      "ledgerId": 324711539,
      "entries": 0,
      "size": 0
    }
  ],
  "cursors": {
    "my-subscription": {
      "markDeletePosition": "324711539:3133",
      "readPosition": "324711539:3233",
      "waitingReadOp": true,
      "pendingReadOps": 0,
      "messagesConsumedCounter": 20449501,
      "cursorLedger": 324702104,
      "cursorLedgerLastEntry": 21,
      "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
      "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
      "state": "Open"
    }
  }
}

pulsar-admin

$ pulsar-admin persistent stats-internal \
  persistent://test-property/cl1/ns1/tp1

REST API

GET/admin/persistent/:property/:cluster/:namespace/:destination/internalStats

More info

Java

admin.persistentTopics().getInternalStats(persistentTopic);

Concepts

Normal topics can be served only by a single broker, which limits the topic’s maximum throughput. Partitioned topics are a special type of topic that be handled by multiple brokers, which allows for much higher throughput.

Behind the scenes, a partitioned topic is actually implemented as N internal topics, where N is the number of partitions. When publishing messages to a partitioned topic, each message is routed to one of several brokers. The distribution of partitions across brokers is handled automatically by Pulsar.

The diagram below illustrates this:

Partitioned Topic

Here, the topic T1 has five partitions (P0 through P4) split across three brokers. Because there are more partitions than brokers, two brokers handle two partitions a piece, while the third handles only one (again, Pulsar handles this distribution of partitions automatically).

Messages for this topic are broadcast to two consumers. The routing mode determines both which broker handles each partition, while the subscription mode determines which messages go to which consumers.

Decisions about routing and subscription modes can be made separately in most cases. Throughput concerns should guide partitioning/routing decisions while subscription decisions should be guided by application semantics.

There is no difference between partitioned topics and normal topics in terms of how subscription modes work, as partitioning only determines what happens between when a message is published by a producer and processed and acknowledged by a consumer.

Partitioned topics need to be explicitly created via the admin API. The number of partitions can be specified when creating the topic.

Routing modes

When publishing to partitioned topics, you must specify a routing mode. The routing mode determines which partition—that is, which internal topic—each message should be published to.

There are three routing modes available by default:

Mode Description Ordering guarantee
Key hash If a key property has been specified on the message, the partitioned producer will hash the key and assign it to a particular partition. Per-key-bucket ordering
Single default partition If no key is provided, each producer’s message will be routed to a dedicated partition, initially random selected Per-producer ordering
Round robin distribution If no key is provided, all messages will be routed to different partitions in round-robin fashion to achieve maximum throughput. None

In addition to these default modes, you can also create a custom routing mode if you’re using the Java client by implementing the MessageRouter interface.