You can use Pulsar's admin API to create and manage non-partitioned topics.
In all of the instructions and commands below, the topic name structure is:
Non-Partitioned topics resources
Non-partitioned topics in Pulsar must be explicitly created. When creating a new non-partitioned topic you need to provide a name for the topic.
By default, after 60 seconds of creation, topics are considered inactive and deleted automatically to prevent from generating trash data.
To disable this feature, set
To change the frequency of checking inactive topics, set
brokerDeleteInactiveTopicsFrequencySecondsto your desired value.
For more information about these two parameters, see here.
You can create non-partitioned topics using the
create command and specifying the topic name as an argument. Here's an example:
bin/pulsar-admin topics create \ persistent://my-tenant/my-namespace/my-topic
It's only allowed to create non partitioned topic of name contains suffix '-partition-' followed by numeric value like 'xyz-topic-partition-10', if there's already a partitioned topic with same name, in this case 'xyz-topic', and has number of partition larger then that numeric value in this case 11(partition index is start from 0). Else creation of such topic will fail.
String topicName = "persistent://my-tenant/my-namespace/my-topic"; admin.topics().createNonPartitionedTopic(topicName);
Non-partitioned topics can be deleted using the
delete command, specifying the topic by name:
bin/pulsar-admin topics delete \ persistent://my-tenant/my-namespace/my-topic
It provides a list of topics existing under a given namespace.
pulsar-admin topics list tenant/namespace persistent://tenant/namespace/topic1 persistent://tenant/namespace/topic2
It shows current statistics of a given topic. Here's an example payload:
The following stats are available:
|msgRateIn||The sum of all local and replication publishers’ publish rates in messages per second|
|msgThroughputIn||Same as msgRateIn but in bytes per second instead of messages per second|
|msgRateOut||The sum of all local and replication consumers’ dispatch rates in messages per second|
|msgThroughputOut||Same as msgRateOut but in bytes per second instead of messages per second|
|averageMsgSize||Average message size, in bytes, from this publisher within the last interval|
|storageSize||The sum of the ledgers’ storage size for this topic|
|publishers||The list of all local publishers into the topic. There can be anywhere from zero to thousands.|
|producerId||Internal identifier for this producer on this topic|
|producerName||Internal identifier for this producer, generated by the client library|
|address||IP address and source port for the connection of this producer|
|connectedSince||Timestamp this producer was created or last reconnected|
|subscriptions||The list of all local subscriptions to the topic|
|my-subscription||The name of this subscription (client defined)|
|msgBacklog||The count of messages in backlog for this subscription|
|msgBacklogNoDelayed||The count of messages in backlog without delayed messages for this subscription|
|type||This subscription type|
|msgRateExpired||The rate at which messages were discarded instead of dispatched from this subscription due to TTL|
|consumers||The list of connected consumers for this subscription|
|consumerName||Internal identifier for this consumer, generated by the client library|
|availablePermits||The number of messages this consumer has space for in the client library’s listen queue. A value of 0 means the client library’s queue is full and receive() isn’t being called. A nonzero value means this consumer is ready to be dispatched messages.|
|replication||This section gives the stats for cross-colo replication of this topic|
|replicationBacklog||The outbound replication backlog in messages|
|connected||Whether the outbound replicator is connected|
|replicationDelayInSeconds||How long the oldest message has been waiting to be sent through the connection, if connected is true|
|inboundConnection||The IP and port of the broker in the remote cluster’s publisher connection to this broker|
|inboundConnectedSince||The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.|
The stats for the topic and its connected producers and consumers can be fetched by using the
stats command, specifying the topic by name:
pulsar-admin topics stats \ persistent://test-tenant/namespace/topic \ --get-precise-backlog
admin.topics().getStats(persistentTopic, false /* is precise backlog */);