Skip to main content

Manage topics

tip

This page only shows some frequently used operations.

  • For the latest and complete information about Pulsar admin, including commands, flags, descriptions, and more, see Pulsar admin doc.

  • For the latest and complete information about REST API, including parameters, responses, samples, and more, see REST API doc.

  • For the latest and complete information about Java admin API, including classes, methods, descriptions, and more, see Java admin API doc.

Pulsar has persistent and non-persistent topics. A persistent topic is a logical endpoint for publishing and consuming messages. The topic name structure for persistent topics is:

persistent://tenant/namespace/topic

Non-persistent topics are used in applications that only consume real-time published messages and do not need persistent guarantees. In this way, it reduces message-publish latency by removing overhead of persisting messages. The topic name structure for non-persistent topics is:

non-persistent://tenant/namespace/topic

Manage topic resources

Whether it is a persistent or non-persistent topic, you can obtain the topic resources through pulsar-admin tool, REST API and Java.

note

In REST API, :schema stands for persistent or non-persistent. :tenant, :namespace, :x are variables, replace them with the real tenant, namespace, and x names when using them. Take GET /admin/v2/:schema/:tenant/:namespace/getList as an example, to get the list of persistent topics in REST API, use https://pulsar.apache.org/admin/v2/persistent/my-tenant/my-namespace. To get the list of non-persistent topics in REST API, use https://pulsar.apache.org/admin/v2/non-persistent/my-tenant/my-namespace.

List of topics

You can get the list of topics under a given namespace in the following ways.

pulsar-admin topics list my-tenant/my-namespace

Grant permission

You can grant permissions on a client role to perform specific actions on a given topic in the following ways.

pulsar-admin topics grant-permission \
--actions produce,consume \
--role application1 \
persistent://test-tenant/ns1/tp1

Get permission

You can fetch permission in the following ways.

pulsar-admin topics permissions persistent://test-tenant/ns1/tp1

Example output:

application1    [consume, produce]

Revoke permission

You can revoke permissions granted on a client role in the following ways.

pulsar-admin topics revoke-permission \
--role application1 \
persistent://test-tenant/ns1/tp1

Delete topic

You can delete a topic in the following ways. You cannot delete a topic if any active subscription or producer is connected to the topic.

pulsar-admin topics delete persistent://test-tenant/ns1/tp1

Unload topic

You can unload a topic in the following ways.

pulsar-admin topics unload persistent://test-tenant/ns1/tp1

Get stats

For the detailed statistics of a topic, see Pulsar statistics.

The following is an example of a topic status.

{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 504,
"msgInCounter" : 9,
"bytesOutCounter" : 2296,
"msgOutCounter" : 41,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 504,
"backlogSize" : 0,
"filteredEntriesCount" : 100,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize" : 0,
"publishers" : [ {
"accessMode" : "Shared",
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"chunkedMessageRate" : 0.0,
"producerId" : 0,
"metadata" : { },
"address" : "/127.0.0.1:65402",
"connectedSince" : "2021-06-09T17:22:55.913+08:00",
"clientVersion" : "2.9.0-SNAPSHOT",
"producerName" : "standalone-1-0"
} ],
"waitingPublishers" : 0,
"subscriptions" : {
"sub-demo" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 2296,
"msgOutCounter" : 41,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 0,
"backlogSize" : 0,
"earliestMsgPublishTimeInBacklog": 0,
"msgBacklogNoDelayed" : 0,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Exclusive",
"activeConsumerName" : "20b81",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1623230565356,
"lastConsumedTimestamp" : 1623230583946,
"lastAckedTimestamp" : 1623230584033,
"lastMarkDeleteAdvancedTimestamp" : 1623230584033,
"filterProcessedMsgCount": 100,
"filterAcceptedMsgCount": 100,
"filterRejectedMsgCount": 0,
"filterRescheduledMsgCount": 0,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 2296,
"msgOutCounter" : 41,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "20b81",
"availablePermits" : 959,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 314,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1623230584033,
"lastConsumedTimestamp" : 1623230583946,
"metadata" : { },
"address" : "/127.0.0.1:65172",
"connectedSince" : "2021-06-09T17:22:45.353+08:00",
"clientVersion" : "2.9.0-SNAPSHOT"
} ],
"allowOutOfOrderDelivery": false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"ownerBroker" : "localhost:8080"
}

To get the status of a topic, you can use the following ways.

pulsar-admin topics stats persistent://test-tenant/ns1/tp1

Get internal stats

For the detailed internal statistics inside a topic, see Pulsar statistics.

The following is an example of the internal statistics of a topic.

{
"entriesAddedCounter":0,
"numberOfEntries":0,
"totalSize":0,
"currentLedgerEntries":0,
"currentLedgerSize":0,
"lastLedgerCreatedTimestamp":"2021-01-22T21:12:14.868+08:00",
"lastLedgerCreationFailureTimestamp":null,
"waitingCursorsCount":0,
"pendingAddEntriesCount":0,
"lastConfirmedEntry":"3:-1",
"state":"LedgerOpened",
"ledgers":[
{
"ledgerId":3,
"entries":0,
"size":0,
"offloaded":false,
"metadata":null
}
],
"cursors":{
"test":{
"markDeletePosition":"3:-1",
"readPosition":"3:-1",
"waitingReadOp":false,
"pendingReadOps":0,
"messagesConsumedCounter":0,
"cursorLedger":4,
"cursorLedgerLastEntry":1,
"individuallyDeletedMessages":"[]",
"lastLedgerSwitchTimestamp":"2021-01-22T21:12:14.966+08:00",
"state":"Open",
"numberOfEntriesSinceFirstNotAckedMessage":0,
"totalNonContiguousDeletedMessagesRange":0,
"properties":{

}
}
},
"schemaLedgers":[
{
"ledgerId":1,
"entries":11,
"size":10,
"offloaded":false,
"metadata":null
}
],
"compactedLedger":{
"ledgerId":-1,
"entries":-1,
"size":-1,
"offloaded":false,
"metadata":null
}
}

To get the internal status of a topic, you can use the following ways.

pulsar-admin topics stats-internal persistent://test-tenant/ns1/tp1

Peek messages

You can peek a number of messages for a specific subscription of a given topic in the following ways.

pulsar-admin topics peek-messages \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1

Example output:

Message ID: 77:2
Publish time: 1668674963028
Event time: 0
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 2d 31 |hello-1 |
+--------+-------------------------------------------------+----------------+

Get message by ID

You can fetch the message with the given ledger ID and entry ID in the following ways.

pulsar-admin topics get-message-by-id \
-l 10 -e 0 persistent://public/default/my-topic

Examine messages

You can examine a specific message on a topic by position relative to the earliest or the latest message.

pulsar-admin topics examine-messages \
-i latest -m 1 persistent://public/default/my-topic

Get message ID

You can get message ID published at or just after the given datetime.

pulsar-admin topics get-message-id \
persistent://public/default/my-topic \
-d 2021-06-28T19:01:17Z

Skip messages

You can skip a number of messages for a specific subscription of a given topic in the following ways.

pulsar-admin topics skip \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1

Skip all messages

You can skip all the old messages for a specific subscription of a given topic.

pulsar-admin topics clear-backlog \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1

Reset cursor

You can reset a subscription cursor position back to the position which is recorded X minutes before. It essentially calculates the time and position of the cursor at X minutes before and resets it at that position. You can reset the cursor in the following ways.

pulsar-admin topics reset-cursor \
--subscription my-subscription --time 10 \
persistent://test-tenant/ns1/tp1

Look up topic's owner broker

You can locate the owner broker of the given topic in the following ways.

pulsar-admin topics lookup persistent://test-tenant/ns1/tp1

Example output:

"pulsar://broker1.org.com:4480"

Look up partitioned topic's owner broker

You can locate the owner broker of the given partitioned topic in the following ways.

pulsar-admin topics partitioned-lookup persistent://test-tenant/ns1/my-topic

Example output:

"persistent://test-tenant/ns1/my-topic-partition-0   pulsar://localhost:6650"
"persistent://test-tenant/ns1/my-topic-partition-1 pulsar://localhost:6650"
"persistent://test-tenant/ns1/my-topic-partition-2 pulsar://localhost:6650"
"persistent://test-tenant/ns1/my-topic-partition-3 pulsar://localhost:6650"

Lookup the partitioned topics sorted by broker URL

pulsar-admin topics partitioned-lookup \
persistent://test-tenant/ns1/my-topic --sort-by-broker

Example output:

pulsar://localhost:6650   [persistent://test-tenant/ns1/my-topic-partition-0, persistent://test-tenant/ns1/my-topic-partition-1, persistent://test-tenant/ns1/my-topic-partition-2, persistent://test-tenant/ns1/my-topic-partition-3]

Get bundle

You can get the range of the bundle that the given topic belongs to in the following ways.

pulsar-admin topics bundle-range persistent://test-tenant/ns1/tp1

Example output:

"0x00000000_0xffffffff"

Get subscriptions

You can check all subscription names for a given topic in the following ways.

pulsar-admin topics subscriptions persistent://test-tenant/ns1/tp1

Example output:

my-subscription

Last Message Id

You can get the last committed message ID for a persistent topic. It is available since 2.3.0 release.

pulsar-admin topics last-message-id topic-name

Example output:

{
"ledgerId" : 97,
"entryId" : 9,
"partitionIndex" : -1
}

Get backlog size

You can get the backlog size of a single partition topic or a non-partitioned topic with a given message ID (in bytes).

pulsar-admin topics get-backlog-size \
-m 1:1 \
persistent://test-tenant/ns1/tp1-partition-0

Configure deduplication snapshot interval

Get deduplication snapshot interval

To get the topic-level deduplication snapshot interval, use one of the following methods.

pulsar-admin topics get-deduplication-snapshot-interval my-topic

Set deduplication snapshot interval

To set the topic-level deduplication snapshot interval, use one of the following methods.

Prerequisite brokerDeduplicationEnabled must be set to true.

pulsar-admin topics set-deduplication-snapshot-interval my-topic -i 1000

Remove deduplication snapshot interval

To remove the topic-level deduplication snapshot interval, use one of the following methods.

pulsar-admin topics remove-deduplication-snapshot-interval my-topic

Configure inactive topic policies

Get inactive topic policies

To get the topic-level inactive topic policies, use one of the following methods.

pulsar-admin topics get-inactive-topic-policies my-topic

Set inactive topic policies

To set the topic-level inactive topic policies, use one of the following methods.

pulsar-admin topics set-inactive-topic-policies my-topic

Remove inactive topic policies

To remove the topic-level inactive topic policies, use one of the following methods.

pulsar-admin topics remove-inactive-topic-policies my-topic

Configure offload policies

Get offload policies

To get the topic-level offload policies, use one of the following methods.

pulsar-admin topics get-offload-policies my-topic

Set offload policies

To set the topic-level offload policies, use one of the following methods.

pulsar-admin topics set-offload-policies my-topic

Remove offload policies

To remove the topic-level offload policies, use one of the following methods.

pulsar-admin topics remove-offload-policies my-topic

Manage non-partitioned topics

You can use Pulsar admin API to create, delete and check the status of non-partitioned topics.

Create

Non-partitioned topics must be explicitly created. When creating a new non-partitioned topic, you need to provide a name for the topic.

By default, 60 seconds after creation, topics are considered inactive and deleted automatically to avoid generating trash data. To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to a specific value.

For more information about the two parameters, see here.

You can create non-partitioned topics in the following ways.

When you create non-partitioned topics with the create command, you need to specify the topic name as an argument.

pulsar-admin topics create \
persistent://my-tenant/my-namespace/my-topic
note

When you create a non-partitioned topic with the suffix '-partition-' followed by numeric value like 'xyz-topic-partition-x' for the topic name, if a partitioned topic with same suffix 'xyz-topic-partition-y' exists, then the numeric value(x) for the non-partitioned topic must be larger than the number of partitions(y) of the partitioned topic. Otherwise, you cannot create such a non-partitioned topic.

Delete

You can delete non-partitioned topics in the following ways.

pulsar-admin topics delete \
persistent://my-tenant/my-namespace/my-topic

List

You can get the list of topics under a given namespace in the following ways.

pulsar-admin topics list tenant/namespace

Example output:

persistent://tenant/namespace/topic1
persistent://tenant/namespace/topic2

Stats

You can check the current statistics of a given topic and its connected producers and consumers in the following ways.

pulsar-admin topics stats \
persistent://test-tenant/namespace/topic \
--get-precise-backlog

The following is an example. For the description of topic stats, see Pulsar statistics.

{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}

Internal stats

You can check the detailed statistics of a topic. The following is an example. For the description of each internal topic stats, see Pulsar statistics.

{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}

You can get the internal stats for the partitioned topic in the following ways.

pulsar-admin topics stats-internal \
persistent://test-tenant/namespace/topic

Manage partitioned topics

You can use Pulsar admin API to create, update, delete and check the status of partitioned topics.

Create

When creating a new partitioned topic, you need to provide a name and the number of partitions for the topic.

note

By default, if there are no messages 60 seconds after creation, topics are considered inactive and deleted automatically to avoid generating trash data. To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to a specific value.

For more information about the two parameters, see here.

You can create partitioned topics in the following ways.

When you create partitioned topics with the create-partitioned-topic command, you need to specify the topic name as an argument and the number of partitions using the -p or --partitions flag.

pulsar-admin topics create-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic \
--partitions 4
note

If a non-partitioned topic with the suffix '-partition-' followed by a numeric value like 'xyz-topic-partition-10', you can not create a partitioned topic with name 'xyz-topic', because the partitions of the partitioned topic could override the existing non-partitioned topic. To create such partitioned topic, you have to delete that non-partitioned topic first.

Create missed partitions

When topic auto-creation is disabled, and you have a partitioned topic without any partitions, you can use the create-missed-partitions command to create partitions for the topic.

You can create missed partitions with the create-missed-partitions command and specify the topic name as an argument.

pulsar-admin topics create-missed-partitions \
persistent://my-tenant/my-namespace/my-topic

Get metadata

Partitioned topics are associated with metadata, you can view it as a JSON object. The following metadata field is available.

FieldDescription
partitionsThe number of partitions into which the topic is divided.

You can check the number of partitions in a partitioned topic with the get-partitioned-topic-metadata subcommand.

pulsar-admin topics get-partitioned-topic-metadata \
persistent://my-tenant/my-namespace/my-topic

Example output:

{
"partitions" : 4,
"deleted" : false
}

Update

You can update the number of partitions for an existing partitioned topic if the topic is non-global. However, you can only add the partition number. Decrementing the number of partitions would delete the topic, which is not supported in Pulsar.

Producers and consumers can find the newly created partitions automatically.

You can update partitioned topics with the update-partitioned-topic command.

pulsar-admin topics update-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic \
--partitions 8

Delete

You can delete partitioned topics with the delete-partitioned-topic command, REST API and Java.

pulsar-admin topics delete-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic

List

You can get the list of partitioned topics under a given namespace in the following ways.

pulsar-admin topics list-partitioned-topics tenant/namespace

Example output:

persistent://tenant/namespace/topic1
persistent://tenant/namespace/topic2

Stats

You can check the current statistics of a given partitioned topic and its connected producers and consumers in the following ways.

pulsar-admin topics partitioned-stats \
persistent://test-tenant/namespace/topic \
--per-partition

The following is an example. For the description of each topic stats, see Pulsar statistics.

Note that in the subscription JSON object, chuckedMessageRate is deprecated. Please use chunkedMessageRate. Both will be sent in the JSON for now.

{
"msgRateIn" : 999.992947159793,
"msgThroughputIn" : 1070918.4635439808,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 270318763,
"msgInCounter" : 252489,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 1070.926056966454,
"msgChunkPublished" : false,
"storageSize" : 270316646,
"backlogSize" : 200921133,
"publishers" : [ {
"msgRateIn" : 999.992947159793,
"msgThroughputIn" : 1070918.4635439808,
"averageMsgSize" : 1070.3333333333333,
"chunkedMessageRate" : 0.0,
"producerId" : 0
} ],
"subscriptions" : {
"test" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"chunkedMessageRate" : 0,
"msgBacklog" : 144318,
"msgBacklogNoDelayed" : 144318,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false
}
},
"replication" : { },
"metadata" : {
"partitions" : 3
},
"partitions" : { }
}

Internal stats

You can check the detailed statistics of a partitioned topic. The following is an example. For the description of each internal topic stats, see Pulsar statistics.

{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}

You can get the internal stats for the partitioned topic in the following ways.

pulsar-admin topics partitioned-stats-internal \
persistent://test-tenant/namespace/topic

Manage subscriptions

You can use Pulsar admin API to create, check, and delete subscriptions.

Create subscription

You can create a subscription for a topic using one of the following methods.

pulsar-admin topics create-subscription \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1

Get subscription

You can check all subscription names for a given topic using one of the following methods.

pulsar-admin topics subscriptions persistent://test-tenant/ns1/tp1

Example output:

my-subscription

Unsubscribe subscription

When a subscription does not process messages anymore, you can unsubscribe it using one of the following methods.

pulsar-admin topics unsubscribe \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1