Get started
This guide walks you through the quickest way to get started with the following methods to manage topics.
- pulsar-admin
- REST API
- Java
pulsar-admin CLI is a command-line tool and is available in the bin folder of your Pulsar installation.
REST API belongs to HTTP calls, which are made against the admin APIs provided by brokers. In addition, both the Java admin API and pulsar-admin CLI use the REST API.
Java admin API is a programmable interface written in Java.
Check the detailed steps below.
- pulsar-admin
- REST API
- Java
To manage topics using pulsar-admin CLI, complte the following steps.
-
Set the service URL.
-
Create a partitioned topic.
-
Update the number of a partition.
-
Produce messages to the topic.
-
Check the stats of the topic.
-
Delete the topic.
Prerequisites
- Install and start Pulsar standalone. This tutorial runs Pulsar 2.11 as an example.
Steps
Step 1: Set the service URLs to point to the broker service in client.conf.
webServiceUrl=http://localhost:8080/
brokerServiceUrl=pulsar://localhost:6650/
Step 2: Create a persistent topic named test-topic-1 with 6 partitions.
Input
bin/pulsar-admin topics create-partitioned-topic \
persistent://public/default/test-topic-1 \
--partitions 6
Output
There is no output. You can check the status of the topic in Step 5.
Step 3: Update the number of the partition to 8.
Input
bin/pulsar-admin topics update-partitioned-topic \
persistent://public/default/test-topic-1 \
--partitions 8
Output
There is no output. You can check the number of partitions in Step 5.
Step 4: Produce some messages to the partitioned topic test-topic-1.
Input
bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-1
Output
2023-03-07T15:33:56,832+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
"confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf",
"serviceURL" : "pulsar://localhost:6650",
"authPluginClassName" : "",
"authParams" : "",
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"maxConnections" : 1,
"statsIntervalSeconds" : 1000,
"ioThreads" : 1,
"enableBusyWait" : false,
"listenerName" : null,
"listenerThreads" : 1,
"maxLookupRequest" : 50000,
"topics" : [ "persistent://public/default/test-topic-1" ],
"numTestThreads" : 1,
"msgRate" : 1000,
"msgSize" : 1024,
"numTopics" : 1,
"numProducers" : 1,
"separator" : "-",
"sendTimeout" : 0,
"producerName" : null,
"adminURL" : "http://localhost:8080/",
...
2023-03-07T15:35:03,769+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 8.931 ms - med: 3.775 - 95pct: 32.144 - 99pct: 98.432 - 99.9pct: 216.088 - 99.99pct: 304.807 - 99.999pct: 349.391 - Max: 351.235
Step 5: Check the internal stats of the partitioned topic test-topic-1.
Input
bin/pulsar-admin topics partitioned-stats-internal \
persistent://public/default/test-topic-1
Output
Below is a part of the output. For detailed explanations of topic stats, see Pulsar statistics.
{
"metadata" : {
"partitions" : 8
},
"partitions" : {
"persistent://public/default/test-topic-1-partition-1" : {
"entriesAddedCounter" : 4213,
"numberOfEntries" : 4213,
"totalSize" : 8817693,
"currentLedgerEntries" : 4212,
"currentLedgerSize" : 8806289,
"lastLedgerCreatedTimestamp" : "2023-03-07T15:33:59.367+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "65:4211",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 49,
"entries" : 1,
"size" : 11404,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 65,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"test-subscriptio-1" : {
"markDeletePosition" : "49:-1",
"readPosition" : "49:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : -1,
"cursorLedgerLastEntry" : -1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00",
"state" : "NoLedger",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
},
"test-subscription-1" : {
"markDeletePosition" : "49:-1",
"readPosition" : "49:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : -1,
"cursorLedgerLastEntry" : -1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00",
"state" : "NoLedger",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
},
...
Step 6: Delete the topic test-topic-1.
Input
bin/pulsar-admin topics delete-partitioned-topic persistent://public/default/test-topic-1
Output
There is no output. You can verify whether the test-topic-1 exists or not using the following command.
Input
List topics in public/default
namespace.
bin/pulsar-admin topics list public/default
To manage topics using REST API, complete the following steps.
-
Create a partitioned topic
-
Update the number of a partition.
-
Produce messages to the topic.
-
Check the stats of the topic.
-
Delete the topic.
Prerequisites
- Install and start Pulsar standalone. This tutorial runs Pulsar 2.11 as an example.
Steps
Step 1: Create a persistent topic named test-topic-2 with 4 partitions.
Input
curl -X PUT http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "4"
Output
There is no output. You can check the topic in Step 4.
Step 2: Update the number of the partition to 5.
Input
curl -X POST http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "5"
Output
There is no output. You can check the status of the topic in Step 4.
Step 3: Produce some messages to the partitioned topic test-topic-2.
Input
bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-2
Output
2023-03-08T15:47:06,268+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
"confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf",
"serviceURL" : "pulsar://localhost:6650",
"authPluginClassName" : "",
"authParams" : "",
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"maxConnections" : 1,
"statsIntervalSeconds" : 1000,
"ioThreads" : 1,
"enableBusyWait" : false,
"listenerName" : null,
"listenerThreads" : 1,
"maxLookupRequest" : 50000,
"topics" : [ "persistent://public/default/test-topic-2" ],
"numTestThreads" : 1,
"msgRate" : 1000,
"msgSize" : 1024,
"numTopics" : 1,
"numProducers" : 1,
"separator" : "-",
"sendTimeout" : 0,
"producerName" : null,
"adminURL" : "http://localhost:8080/",
"deprecatedAuthPluginClassName" : null,
"maxOutstanding" : 0,
"maxPendingMessagesAcrossPartitions" : 0,
"partitions" : null,
"numMessages" : 0,
"compression" : "NONE",
"payloadFilename" : null,
"payloadDelimiter" : "\\n",
"batchTimeMillis" : 1.0,
"batchMaxMessages" : 1000,
"batchMaxBytes" : 4194304,
"testTime" : 0,
"warmupTimeSeconds" : 1.0,
"encKeyName" : null,
"encKeyFile" : null,
"delay" : 0,
"exitOnFailure" : false,
"messageKeyGenerationMode" : null,
"producerAccessMode" : "Shared",
"formatPayload" : false,
"formatterClass" : "org.apache.pulsar.testclient.DefaultMessageFormatter",
"transactionTimeout" : 10,
"numMessagesPerTransaction" : 50,
"isEnableTransaction" : false,
"isAbortTransaction" : false,
"histogramFile" : null
}
...
2023-03-08T15:53:28,178+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 4.481 ms - med: 2.918 - 95pct: 10.710 - 99pct: 38.928 - 99.9pct: 112.689 - 99.99pct: 154.241 - 99.999pct: 193.249 - Max: 241.717
Step 4: Check the internal stats of the topic test-topic-2.
Input
curl -X GET http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitioned-internalStats
Output
For detailed explanations of topic stats, see Pulsar statistics.
{"metadata":{"partitions":5},"partitions":{"persistent://public/default/test-topic-2-partition-3":{"entriesAddedCounter":47087,"numberOfEntries":47087,"totalSize":80406959,"currentLedgerEntries":47087,"currentLedgerSize":80406959,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.273+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"117:47086","state":"LedgerOpened","ledgers":[{"ledgerId":117,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],"cursors":{},"schemaLedgers":[],"compactedLedger":{"ledgerId":-1,"entries":-1,"size":-1,"offloaded":false,"underReplicated":false}},"persistent://public/default/test-topic-2-partition-2":{"entriesAddedCounter":46995,"numberOfEntries":46995,"totalSize":80445417,"currentLedgerEntries":46995,"currentLedgerSize":80445417,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.43+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"118:46994","state":"LedgerOpened","ledgers":[{"ledgerId":118,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],...
Step 5: Delete the topic test-topic-2.
Input
curl -X DELETE http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions
Output
There is no output. You can verify whether the test-topic-2 exists or not using the following command.
Input
List topics in public/default
namespace.
curl -X GET http://localhost:8080/admin/v2/persistent/public/default
To manage topics using Java admin API, complete following steps.
-
Initiate a Pulsar Java client.
-
Create a partitioned topic
-
Update the number of a partition.
-
Produce messages to the topic.
-
Check the stats of the topic.
-
Delete the topic.
Prerequisites
-
Prepare a Java project and add the following dependency to your POM file.
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>3.3.2</version>
</dependency>
Steps
Step 1: Initiate a Pulsar Java client in your Java project.
Input
String url = "http://localhost:8080";
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(url)
.build();
Step 2: Create a partitioned topic test-topic-1 with 4 partitions.
Input
admin.topics().createPartitionedTopic("persistent://public/default/test-topic-1", 4);
Step 3: Update the number of the partition to 5.
Input
admin.topics().updatePartitionedTopic("test-topic-1", 5);
Step 4: Produce some messages to the topic test-topic-1.
Input
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("test-topic-1")
.blockIfQueueFull(true)
.create();
for (int i = 0; i < 100; ++i) {
producer.newMessage().value("test").send();
}
producer.close();
client.close();
Step 5: Check the stats of the topic test-topic-1.
Input
PartitionedTopicStats stats = admin.topics().getPartitionedStats("persistent://public/default/test-topic-1",false);
System.out.println(stats.getMsgInCounter());
Output
100
Step 6: Delete the topic test-topic-1.
Input
admin.topics().deletePartitionedTopic("test-topic-1");
Related topics
-
To understand basics, see Pulsar admin API - Overview
-
To learn usage scenarios, see Pulsar admin API - Use cases.
-
To learn common administrative tasks, see Pulsar admin API - Features.
-
To perform administrative operations, see Pulsar admin API - Tools.
-
To check the detailed usage, see the references below.
-
Pulsar admin APIs