Skip to main content

Manage topics

tip

This page only shows some frequently used operations.

  • For the latest and complete information about Pulsar admin, including commands, flags, descriptions, and more, see Pulsar admin docs.

  • For the latest and complete information about REST API, including parameters, responses, samples, and more, see REST API doc.

  • For the latest and complete information about Java admin API, including classes, methods, descriptions, and more, see Java admin API doc.

Pulsar has persistent and non-persistent topics. A persistent topic is a logical endpoint for publishing and consuming messages. The topic name structure for persistent topics is:

persistent://tenant/namespace/topic

Non-persistent topics are used in applications that only consume real-time published messages and do not need persistent guarantees. In this way, it reduces message-publish latency by removing overhead of persisting messages. The topic name structure for non-persistent topics is:

non-persistent://tenant/namespace/topic

Manage topic resources​

Whether it is a persistent or non-persistent topic, you can obtain the topic resources through pulsar-admin tool, REST API and Java.

note

In REST API, :schema stands for persistent or non-persistent. :tenant, :namespace, :x are variables, replace them with the real tenant, namespace, and x names when using them. Take GET /admin/v2/:schema/:tenant/:namespace/getList as an example, to get the list of persistent topics in REST API, use https://pulsar.apache.org/admin/v2/persistent/my-tenant/my-namespace. To get the list of non-persistent topics in REST API, use https://pulsar.apache.org/admin/v2/non-persistent/my-tenant/my-namespace.

List of topics​

You can get the list of topics under a given namespace in the following ways.

pulsar-admin topics list my-tenant/my-namespace

Grant permission​

You can grant permissions on a client role to perform specific actions on a given topic in the following ways.

pulsar-admin topics grant-permission \
--actions produce,consume \
--role application1 \
persistent://test-tenant/ns1/tp1

Get permission​

You can fetch permission in the following ways.

pulsar-admin topics permissions persistent://test-tenant/ns1/tp1

Example output:

application1    [consume, produce]

Revoke permission​

You can revoke permissions granted on a client role in the following ways.

pulsar-admin topics revoke-permission \
--role application1 \
persistent://test-tenant/ns1/tp1

Delete topic​

You can delete a topic in the following ways. You cannot delete a topic if any active subscription or producer is connected to the topic.

pulsar-admin topics delete persistent://test-tenant/ns1/tp1

Unload topic​

You can unload a topic in the following ways.

pulsar-admin topics unload persistent://test-tenant/ns1/tp1

Get stats​

For the detailed statistics of a topic, see Pulsar statistics.

The following is an example of a topic status.

{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 504,
"msgInCounter" : 9,
"bytesOutCounter" : 2296,
"msgOutCounter" : 41,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 504,
"backlogSize" : 0,
"filteredEntriesCount" : 100,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize" : 0,
"publishers" : [ {
"accessMode" : "Shared",
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"chunkedMessageRate" : 0.0,
"producerId" : 0,
"metadata" : { },
"address" : "/127.0.0.1:65402",
"connectedSince" : "2021-06-09T17:22:55.913+08:00",
"clientVersion" : "2.9.0-SNAPSHOT",
"producerName" : "standalone-1-0"
} ],
"waitingPublishers" : 0,
"subscriptions" : {
"sub-demo" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 2296,
"msgOutCounter" : 41,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 0,
"backlogSize" : 0,
"earliestMsgPublishTimeInBacklog": 0,
"msgBacklogNoDelayed" : 0,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Exclusive",
"activeConsumerName" : "20b81",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1623230565356,
"lastConsumedTimestamp" : 1623230583946,
"lastAckedTimestamp" : 1623230584033,
"lastMarkDeleteAdvancedTimestamp" : 1623230584033,
"filterProcessedMsgCount": 100,
"filterAcceptedMsgCount": 100,
"filterRejectedMsgCount": 0,
"filterRescheduledMsgCount": 0,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 2296,
"msgOutCounter" : 41,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "20b81",
"availablePermits" : 959,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 314,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1623230584033,
"lastConsumedTimestamp" : 1623230583946,
"metadata" : { },
"address" : "/127.0.0.1:65172",
"connectedSince" : "2021-06-09T17:22:45.353+08:00",
"clientVersion" : "2.9.0-SNAPSHOT"
} ],
"allowOutOfOrderDelivery": false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"ownerBroker" : "localhost:8080"
}

To get the status of a topic, you can use the following ways.

pulsar-admin topics stats persistent://test-tenant/ns1/tp1

Get internal stats​

For the detailed internal statistics inside a topic, see Pulsar statistics.

The following is an example of the internal statistics of a topic.

{
"entriesAddedCounter":0,
"numberOfEntries":0,
"totalSize":0,
"currentLedgerEntries":0,
"currentLedgerSize":0,
"lastLedgerCreatedTimestamp":"2021-01-22T21:12:14.868+08:00",
"lastLedgerCreationFailureTimestamp":null,
"waitingCursorsCount":0,
"pendingAddEntriesCount":0,
"lastConfirmedEntry":"3:-1",
"state":"LedgerOpened",
"ledgers":[
{
"ledgerId":3,
"entries":0,
"size":0,
"offloaded":false,
"metadata":null
}
],
"cursors":{
"test":{
"markDeletePosition":"3:-1",
"readPosition":"3:-1",
"waitingReadOp":false,
"pendingReadOps":0,
"messagesConsumedCounter":0,
"cursorLedger":4,
"cursorLedgerLastEntry":1,
"individuallyDeletedMessages":"[]",
"lastLedgerSwitchTimestamp":"2021-01-22T21:12:14.966+08:00",
"state":"Open",
"numberOfEntriesSinceFirstNotAckedMessage":0,
"totalNonContiguousDeletedMessagesRange":0,
"properties":{

}
}
},
"schemaLedgers":[
{
"ledgerId":1,
"entries":11,
"size":10,
"offloaded":false,
"metadata":null
}
],
"compactedLedger":{
"ledgerId":-1,
"entries":-1,
"size":-1,
"offloaded":false,
"metadata":null
}
}

To get the internal status of a topic, you can use the following ways.

pulsar-admin topics stats-internal persistent://test-tenant/ns1/tp1

Peek messages​

You can peek a number of messages for a specific subscription of a given topic in the following ways.

pulsar-admin topics peek-messages \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1

Example output:

Message ID: 77:2
Publish time: 1668674963028
Event time: 0
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 2d 31 |hello-1 |
+--------+-------------------------------------------------+----------------+

Get message by ID​

You can fetch the message with the given ledger ID and entry ID in the following ways.

pulsar-admin topics get-message-by-id \
-l 10 -e 0 persistent://public/default/my-topic

Examine messages​

You can examine a specific message on a topic by position relative to the earliest or the latest message.

pulsar-admin topics examine-messages \
-i latest -m 1 persistent://public/default/my-topic

Get message ID​

You can get message ID published at or just after the given datetime.

pulsar-admin topics get-message-id \
persistent://public/default/my-topic \
-d 2021-06-28T19:01:17Z

Skip messages​

You can skip a number of messages for a specific subscription of a given topic in the following ways.

pulsar-admin topics skip \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1

Skip all messages​

You can skip all the old messages for a specific subscription of a given topic.

pulsar-admin topics clear-backlog \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1

Reset cursor​

You can reset a subscription cursor position back to the position which is recorded X minutes before. It essentially calculates the time and position of the cursor at X minutes before and resets it at that position. You can reset the cursor in the following ways.

pulsar-admin topics reset-cursor \
--subscription my-subscription --time 10 \
persistent://test-tenant/ns1/tp1

Look up topic's owner broker​

You can locate the owner broker of the given topic in the following ways.

pulsar-admin topics lookup persistent://test-tenant/ns1/tp1

Example output:

"pulsar://broker1.org.com:4480"

Look up partitioned topic's owner broker​

You can locate the owner broker of the given partitioned topic in the following ways.

pulsar-admin topics partitioned-lookup persistent://test-tenant/ns1/my-topic

Example output:

"persistent://test-tenant/ns1/my-topic-partition-0   pulsar://localhost:6650"
"persistent://test-tenant/ns1/my-topic-partition-1 pulsar://localhost:6650"
"persistent://test-tenant/ns1/my-topic-partition-2 pulsar://localhost:6650"
"persistent://test-tenant/ns1/my-topic-partition-3 pulsar://localhost:6650"

Lookup the partitioned topics sorted by broker URL

pulsar-admin topics partitioned-lookup \
persistent://test-tenant/ns1/my-topic --sort-by-broker

Example output:

pulsar://localhost:6650   [persistent://test-tenant/ns1/my-topic-partition-0, persistent://test-tenant/ns1/my-topic-partition-1, persistent://test-tenant/ns1/my-topic-partition-2, persistent://test-tenant/ns1/my-topic-partition-3]

Get bundle​

You can get the range of the bundle that the given topic belongs to in the following ways.

pulsar-admin topics bundle-range persistent://test-tenant/ns1/tp1

Example output:

"0x00000000_0xffffffff"

Get subscriptions​

You can check all subscription names for a given topic in the following ways.

pulsar-admin topics subscriptions persistent://test-tenant/ns1/tp1

Example output:

my-subscription

Last Message Id​

You can get the last committed message ID for a persistent topic. It is available since 2.3.0 release.

pulsar-admin topics last-message-id topic-name

Example output:

{
"ledgerId" : 97,
"entryId" : 9,
"partitionIndex" : -1
}

Get backlog size​

You can get the backlog size of a single partition topic or a non-partitioned topic with a given message ID (in bytes).

pulsar-admin topics get-backlog-size \
-m 1:1 \
persistent://test-tenant/ns1/tp1-partition-0

Configure deduplication snapshot interval​

Get deduplication snapshot interval​

To get the topic-level deduplication snapshot interval, use one of the following methods.

pulsar-admin topics get-deduplication-snapshot-interval my-topic

Set deduplication snapshot interval​

To set the topic-level deduplication snapshot interval, use one of the following methods.

Prerequisite brokerDeduplicationEnabled must be set to true.

pulsar-admin topics set-deduplication-snapshot-interval my-topic -i 1000

Remove deduplication snapshot interval​

To remove the topic-level deduplication snapshot interval, use one of the following methods.

pulsar-admin topics remove-deduplication-snapshot-interval my-topic

Configure inactive topic policies​

Get inactive topic policies​

To get the topic-level inactive topic policies, use one of the following methods.

pulsar-admin topics get-inactive-topic-policies my-topic

Set inactive topic policies​

To set the topic-level inactive topic policies, use one of the following methods.

pulsar-admin topics set-inactive-topic-policies my-topic

Remove inactive topic policies​

To remove the topic-level inactive topic policies, use one of the following methods.

pulsar-admin topics remove-inactive-topic-policies my-topic

Configure offload policies​

Get offload policies​

To get the topic-level offload policies, use one of the following methods.

pulsar-admin topics get-offload-policies my-topic

Set offload policies​

To set the topic-level offload policies, use one of the following methods.

pulsar-admin topics set-offload-policies my-topic

Remove offload policies​

To remove the topic-level offload policies, use one of the following methods.

pulsar-admin topics remove-offload-policies my-topic

Manage non-partitioned topics​

You can use Pulsar admin API to create, delete and check the status of non-partitioned topics.

Create​

Non-partitioned topics must be explicitly created. When creating a new non-partitioned topic, you need to provide a name for the topic.

By default, 60 seconds after creation, topics are considered inactive and deleted automatically to avoid generating trash data. To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to a specific value.

For more information about the two parameters, see here.

You can create non-partitioned topics in the following ways.

When you create non-partitioned topics with the create command, you need to specify the topic name as an argument.

pulsar-admin topics create \
persistent://my-tenant/my-namespace/my-topic
note

When you create a non-partitioned topic with the suffix '-partition-' followed by numeric value like 'xyz-topic-partition-x' for the topic name, if a partitioned topic with same suffix 'xyz-topic-partition-y' exists, then the numeric value(x) for the non-partitioned topic must be larger than the number of partitions(y) of the partitioned topic. Otherwise, you cannot create such a non-partitioned topic.

Delete​

You can delete non-partitioned topics in the following ways.

pulsar-admin topics delete \
persistent://my-tenant/my-namespace/my-topic

List​

You can get the list of topics under a given namespace in the following ways.

pulsar-admin topics list tenant/namespace

Example output:

persistent://tenant/namespace/topic1
persistent://tenant/namespace/topic2

Stats​

You can check the current statistics of a given topic and its connected producers and consumers in the following ways.

pulsar-admin topics stats \
persistent://test-tenant/namespace/topic \
--get-precise-backlog

The following is an example. For the description of topic stats, see Pulsar statistics.

{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}

Internal stats​

You can check the detailed statistics of a topic. The following is an example. For the description of each internal topic stats, see Pulsar statistics.

{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}

You can get the internal stats for the partitioned topic in the following ways.

pulsar-admin topics stats-internal \
persistent://test-tenant/namespace/topic

Manage partitioned topics​

You can use Pulsar admin API to create, update, delete and check the status of partitioned topics.

Create​

When creating a new partitioned topic, you need to provide a name and the number of partitions for the topic.

note

By default, if there are no messages 60 seconds after creation, topics are considered inactive and deleted automatically to avoid generating trash data. To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to a specific value.

For more information about the two parameters, see here.

You can create partitioned topics in the following ways.

When you create partitioned topics with the create-partitioned-topic command, you need to specify the topic name as an argument and the number of partitions using the -p or --partitions flag.

pulsar-admin topics create-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic \
--partitions 4
note

If a non-partitioned topic with the suffix '-partition-' followed by a numeric value like 'xyz-topic-partition-10', you can not create a partitioned topic with name 'xyz-topic', because the partitions of the partitioned topic could override the existing non-partitioned topic. To create such partitioned topic, you have to delete that non-partitioned topic first.

Create missed partitions​

When topic auto-creation is disabled, and you have a partitioned topic without any partitions, you can use the create-missed-partitions command to create partitions for the topic.

You can create missed partitions with the create-missed-partitions command and specify the topic name as an argument.

pulsar-admin topics create-missed-partitions \
persistent://my-tenant/my-namespace/my-topic

Get metadata​

Partitioned topics are associated with metadata, you can view it as a JSON object. The following metadata field is available.

FieldDescription
partitionsThe number of partitions into which the topic is divided.

You can check the number of partitions in a partitioned topic with the get-partitioned-topic-metadata subcommand.

pulsar-admin topics get-partitioned-topic-metadata \
persistent://my-tenant/my-namespace/my-topic

Example output:

{
"partitions" : 4,
"deleted" : false
}

Update​

You can update the number of partitions for an existing partitioned topic. However, you can only increase the number of partitions. Decrementing the number of partitions would delete the topic, which is not supported in Pulsar.

Producers and consumers can find the newly created partitions automatically.

You can update partitioned topics with the update-partitioned-topic command.

pulsar-admin topics update-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic \
--partitions 8

Delete​

You can delete partitioned topics with the delete-partitioned-topic command, REST API and Java.

pulsar-admin topics delete-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic

List​

You can get the list of partitioned topics under a given namespace in the following ways.

pulsar-admin topics list-partitioned-topics tenant/namespace

Example output:

persistent://tenant/namespace/topic1
persistent://tenant/namespace/topic2

Stats​

You can check the current statistics of a given partitioned topic and its connected producers and consumers in the following ways.

pulsar-admin topics partitioned-stats \
persistent://test-tenant/namespace/topic \
--per-partition

The following is an example. For the description of each topic stats, see Pulsar statistics.

Note that in the subscription JSON object, chuckedMessageRate is deprecated. Please use chunkedMessageRate. Both will be sent in the JSON for now.

{
"msgRateIn" : 999.992947159793,
"msgThroughputIn" : 1070918.4635439808,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 270318763,
"msgInCounter" : 252489,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 1070.926056966454,
"msgChunkPublished" : false,
"storageSize" : 270316646,
"backlogSize" : 200921133,
"publishers" : [ {
"msgRateIn" : 999.992947159793,
"msgThroughputIn" : 1070918.4635439808,
"averageMsgSize" : 1070.3333333333333,
"chunkedMessageRate" : 0.0,
"producerId" : 0
} ],
"subscriptions" : {
"test" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"chunkedMessageRate" : 0,
"msgBacklog" : 144318,
"msgBacklogNoDelayed" : 144318,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false
}
},
"replication" : { },
"metadata" : {
"partitions" : 3
},
"partitions" : { }
}

Internal stats​

You can check the detailed statistics of a partitioned topic. The following is an example. For the description of each internal topic stats, see Pulsar statistics.

{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}

You can get the internal stats for the partitioned topic in the following ways.

pulsar-admin topics partitioned-stats-internal \
persistent://test-tenant/namespace/topic

Manage subscriptions​

You can use Pulsar admin API to create, check, and delete subscriptions.

Create subscription​

You can create a subscription for a topic using one of the following methods.

pulsar-admin topics create-subscription \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1

Get subscription​

You can check all subscription names for a given topic using one of the following methods.

pulsar-admin topics subscriptions persistent://test-tenant/ns1/tp1

Example output:

my-subscription

Unsubscribe subscription​

When a subscription does not process messages anymore, you can unsubscribe it using one of the following methods.

pulsar-admin topics unsubscribe \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1