Get started
This guide walks you through the quickest way to get started with the following methods to manage topics.
- pulsar-admin
- REST API
- Java
pulsar-admin CLI: it’s a command-line tool and is available in the bin folder of your Pulsar installation.
REST API: HTTP calls, which are made against the admin APIs provided by brokers. In addition, both the Java admin API and pulsar-admin CLI use the REST API.
Java admin API: it’s a programmable interface written in Java.
Check the detailed steps below.
- pulsar-admin
- REST API
- Java
This tutorial guides you through every step of using pulsar-admin CLI to manage topics. It includes the following steps:
-
Set the service URL.
-
Create a partitioned topic.
-
Update the number of a partition.
-
Produce messages to the topic.
-
Check the stats of the topic.
-
Delete the topic.
Prerequisites
- Install and start Pulsar standalone. This tutorial runs Pulsar 2.11 as an example.
Steps
-
Set the service URLs to point to the broker service in client.conf.
webServiceUrl=http://localhost:8080/brokerServiceUrl=pulsar://localhost:6650/ -
Create a persistent topic named test-topic-1 with 6 partitions.
Input
bin/pulsar-admin topics create-partitioned-topic \persistent://public/default/test-topic-1 \--partitions 6Output
There is no output. You can check the status of the topic in Step 5.
-
Update the number of the partition to 8.
Input
bin/pulsar-admin topics update-partitioned-topic \persistent://public/default/test-topic-1 \--partitions 8Output
There is no output. You can check the number of partitions in Step 5.
-
Produce some messages to the partitioned topic test-topic-1.
Input
bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-1Output
2023-03-07T15:33:56,832+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {"confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf","serviceURL" : "pulsar://localhost:6650","authPluginClassName" : "","authParams" : "","tlsTrustCertsFilePath" : "","tlsAllowInsecureConnection" : false,"tlsHostnameVerificationEnable" : false,"maxConnections" : 1,"statsIntervalSeconds" : 1000,"ioThreads" : 1,"enableBusyWait" : false,"listenerName" : null,"listenerThreads" : 1,"maxLookupRequest" : 50000,"topics" : [ "persistent://public/default/test-topic-1" ],"numTestThreads" : 1,"msgRate" : 1000,"msgSize" : 1024,"numTopics" : 1,"numProducers" : 1,"separator" : "-","sendTimeout" : 0,"producerName" : null,"adminURL" : "http://localhost:8080/",...2023-03-07T15:35:03,769+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 8.931 ms - med: 3.775 - 95pct: 32.144 - 99pct: 98.432 - 99.9pct: 216.088 - 99.99pct: 304.807 - 99.999pct: 349.391 - Max: 351.235 -
Check the internal stats of the partitioned topic test-topic-1.
Input
bin/pulsar-admin topics partitioned-stats-internal \persistent://public/default/test-topic-1Output
Below is a part of the output. For detailed explanations of topic stats, see Pulsar statistics.
{"metadata" : {"partitions" : 8},"partitions" : {"persistent://public/default/test-topic-1-partition-1" : {"entriesAddedCounter" : 4213,"numberOfEntries" : 4213,"totalSize" : 8817693,"currentLedgerEntries" : 4212,"currentLedgerSize" : 8806289,"lastLedgerCreatedTimestamp" : "2023-03-07T15:33:59.367+08:00","waitingCursorsCount" : 0,"pendingAddEntriesCount" : 0,"lastConfirmedEntry" : "65:4211","state" : "LedgerOpened","ledgers" : [ {"ledgerId" : 49,"entries" : 1,"size" : 11404,"offloaded" : false,"underReplicated" : false}, {"ledgerId" : 65,"entries" : 0,"size" : 0,"offloaded" : false,"underReplicated" : false} ],"cursors" : {"test-subscriptio-1" : {"markDeletePosition" : "49:-1","readPosition" : "49:0","waitingReadOp" : false,"pendingReadOps" : 0,"messagesConsumedCounter" : 0,"cursorLedger" : -1,"cursorLedgerLastEntry" : -1,"individuallyDeletedMessages" : "[]","lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00","state" : "NoLedger","numberOfEntriesSinceFirstNotAckedMessage" : 1,"totalNonContiguousDeletedMessagesRange" : 0,"subscriptionHavePendingRead" : false,"subscriptionHavePendingReplayRead" : false,"properties" : { }},"test-subscription-1" : {"markDeletePosition" : "49:-1","readPosition" : "49:0","waitingReadOp" : false,"pendingReadOps" : 0,"messagesConsumedCounter" : 0,"cursorLedger" : -1,"cursorLedgerLastEntry" : -1,"individuallyDeletedMessages" : "[]","lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00","state" : "NoLedger","numberOfEntriesSinceFirstNotAckedMessage" : 1,"totalNonContiguousDeletedMessagesRange" : 0,"subscriptionHavePendingRead" : false,"subscriptionHavePendingReplayRead" : false,"properties" : { }}},"schemaLedgers" : [ ],"compactedLedger" : {"ledgerId" : -1,"entries" : -1,"size" : -1,"offloaded" : false,"underReplicated" : false}},... -
Delete the topic test-topic-1.
Input
bin/pulsar-admin topics delete-partitioned-topic persistent://public/default/test-topic-1Output
There is no output. You can verify whether the test-topic-1 exists or not using the following command.
Input
List topics in
public/defaultnamespace.bin/pulsar-admin topics list public/default
This tutorial guides you through every step of using REST API to manage topics. It includes the following steps:
-
Create a partitioned topic
-
Update the number of a partition.
-
Produce messages to the topic.
-
Check the stats of the topic.
-
Delete the topic.
Prerequisites
- Install and start Pulsar standalone. This tutorial runs Pulsar 2.11 as an example.
Steps
-
Create a persistent topic named test-topic-2 with 4 partitions.
Input
curl -X PUT http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "4"Output
There is no output. You can check the topic in Step 4.
-
Update the number of the partition to 5.
Input
curl -X POST http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "5"Output
There is no output. You can check the status of the topic in Step 4.
-
Produce some messages to the partitioned topic test-topic-2.
Input
bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-2Output
2023-03-08T15:47:06,268+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {"confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf","serviceURL" : "pulsar://localhost:6650","authPluginClassName" : "","authParams" : "","tlsTrustCertsFilePath" : "","tlsAllowInsecureConnection" : false,"tlsHostnameVerificationEnable" : false,"maxConnections" : 1,"statsIntervalSeconds" : 1000,"ioThreads" : 1,"enableBusyWait" : false,"listenerName" : null,"listenerThreads" : 1,"maxLookupRequest" : 50000,"topics" : [ "persistent://public/default/test-topic-2" ],"numTestThreads" : 1,"msgRate" : 1000,"msgSize" : 1024,"numTopics" : 1,"numProducers" : 1,"separator" : "-","sendTimeout" : 0,"producerName" : null,"adminURL" : "http://localhost:8080/","deprecatedAuthPluginClassName" : null,"maxOutstanding" : 0,"maxPendingMessagesAcrossPartitions" : 0,"partitions" : null,"numMessages" : 0,"compression" : "NONE","payloadFilename" : null,"payloadDelimiter" : "\\n","batchTimeMillis" : 1.0,"batchMaxMessages" : 1000,"batchMaxBytes" : 4194304,"testTime" : 0,"warmupTimeSeconds" : 1.0,"encKeyName" : null,"encKeyFile" : null,"delay" : 0,"exitOnFailure" : false,"messageKeyGenerationMode" : null,"producerAccessMode" : "Shared","formatPayload" : false,"formatterClass" : "org.apache.pulsar.testclient.DefaultMessageFormatter","transactionTimeout" : 10,"numMessagesPerTransaction" : 50,"isEnableTransaction" : false,"isAbortTransaction" : false,"histogramFile" : null}...2023-03-08T15:53:28,178+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 4.481 ms - med: 2.918 - 95pct: 10.710 - 99pct: 38.928 - 99.9pct: 112.689 - 99.99pct: 154.241 - 99.999pct: 193.249 - Max: 241.717 -
Check the internal stats of the topic test-topic-2.
Input
curl -X GET http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitioned-internalStatsOutput
For detailed explanations of topic stats, see Pulsar statistics.
{"metadata":{"partitions":5},"partitions":{"persistent://public/default/test-topic-2-partition-3":{"entriesAddedCounter":47087,"numberOfEntries":47087,"totalSize":80406959,"currentLedgerEntries":47087,"currentLedgerSize":80406959,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.273+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"117:47086","state":"LedgerOpened","ledgers":[{"ledgerId":117,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],"cursors":{},"schemaLedgers":[],"compactedLedger":{"ledgerId":-1,"entries":-1,"size":-1,"offloaded":false,"underReplicated":false}},"persistent://public/default/test-topic-2-partition-2":{"entriesAddedCounter":46995,"numberOfEntries":46995,"totalSize":80445417,"currentLedgerEntries":46995,"currentLedgerSize":80445417,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.43+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"118:46994","state":"LedgerOpened","ledgers":[{"ledgerId":118,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],... -
Delete the topic test-topic-2.
Input
curl -X DELETE http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitionsOutput
There is no output. You can verify whether the test-topic-2 exists or not using the following command.
Input
List topics in
public/defaultnamespace.curl -X GET http://localhost:8080/admin/v2/persistent/public/default
This tutorial guides you through every step of using Java admin API to manage topics. It includes the following steps:
-
Initiate a Pulsar Java client.
-
Create a partitioned topic
-
Update the number of a partition.
-
Produce messages to the topic.
-
Check the stats of the topic.
-
Delete the topic.
Prerequisites
-
Prepare a Java project and add the following dependency to your POM file.
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-admin</artifactId><version>2.11.0</version></dependency>
Steps
-
Initiate a Pulsar Java client in your Java project.
Input
String url = "http://localhost:8080";PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(url).build(); -
Create a partitioned topic test-topic-1 with 4 partitions.
Input
admin.topics().createPartitionedTopic("persistent://public/default/test-topic-1", 4); -
Update the number of the partition to 5.
Input
admin.topics().updatePartitionedTopic("test-topic-1", 5); -
Produce some messages to the topic test-topic-1.
Input
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();Producer<String> producer = client.newProducer(Schema.STRING).topic("test-topic-1").blockIfQueueFull(true).create();for (int i = 0; i < 100; ++i) {producer.newMessage().value("test").send();}producer.close();client.close(); -
Check the stats of the topic test-topic-1.
Input
PartitionedTopicStats stats = admin.topics().getPartitionedStats("persistent://public/default/test-topic-1",false);System.out.println(stats.getMsgInCounter());Output
100 -
Delete the topic test-topic-1.
Input
admin.topics().deletePartitionedTopic("test-topic-1");
Related topics
-
To understand basics, see Pulsar admin API - Overview
-
To learn usage scenarios, see Pulsar admin API - Use cases.
-
To learn common administrative tasks, see Pulsar admin API - Features.
-
To perform administrative operations, see Pulsar admin API - Tools.
-
To check the detailed usage, see the references below.
-
Pulsar admin APIs