Skip to main content
Version: Next

Manage scalable topics

note

For the concepts behind this feature, see Scalable topics.

This page covers administering scalable topics. To produce and consume, use a client with V5 API support; this page is about creating and operating the topics themselves.

Every operation is available three ways: the pulsar-admin scalable-topics CLI, the REST API under /admin/v2/scalable, and the Java admin client (PulsarAdmin.scalableTopics()). The examples below lead with the CLI; the REST API reference lists every endpoint.

In the commands below, a topic is identified by its tenant/namespace/topic name (without a URL scheme).

Create a scalable topic

A scalable topic is created with an initial number of segments. Start small -- one segment is the default -- and let auto split/merge grow it to fit the load.

bin/pulsar-admin scalable-topics create my-tenant/my-namespace/my-topic --segments 1
OptionDescriptionDefault
-s, --segmentsNumber of initial segments1
-p, --propertyA key=value property; repeat for multiple--

Java admin client:

admin.scalableTopics().createScalableTopic("my-tenant/my-namespace/my-topic", 1);

List scalable topics

List every scalable topic in a namespace:

bin/pulsar-admin scalable-topics list my-tenant/my-namespace

Filter to topics carrying specific properties (repeat -p to AND multiple filters):

bin/pulsar-admin scalable-topics list my-tenant/my-namespace -p team=ingest -p tier=gold

Inspect a scalable topic

Get the topic metadata -- the segment DAG, including each segment's hash range and state:

bin/pulsar-admin scalable-topics get-metadata my-tenant/my-namespace/my-topic

Get aggregated runtime stats:

bin/pulsar-admin scalable-topics stats my-tenant/my-namespace/my-topic

Manage subscriptions

Scalable-topic subscriptions span every segment in the DAG. The admin commands operate on the subscription as a whole.

Reset a subscription to a point in the past (the offset is relative to now -- accepts units such as 30m, 1h, 5d):

bin/pulsar-admin scalable-topics seek my-tenant/my-namespace/my-topic \
--subscription my-sub --time 1h

Skip all undelivered messages on a subscription, across every segment:

bin/pulsar-admin scalable-topics clear-backlog my-tenant/my-namespace/my-topic \
--subscription my-sub

Split and merge segments

Splitting a hot segment and merging cold adjacent segments normally happens automatically (see auto split/merge). The commands below let you trigger them manually -- for testing, or to pre-scale ahead of a known traffic event.

Split one segment into two halves of its hash range:

bin/pulsar-admin scalable-topics split-segment my-tenant/my-namespace/my-topic --segment-id 3

Merge two adjacent segments back into one:

bin/pulsar-admin scalable-topics merge-segments my-tenant/my-namespace/my-topic \
--segment-id-1 3 --segment-id-2 4

Segment IDs come from get-metadata. Merging requires the two segments to own adjacent hash ranges.

Configure auto split/merge

Auto split/merge is on by default: each topic's controller splits any segment whose load crosses the split thresholds and merges adjacent segments that stay cold below the merge thresholds. It is configured at three levels, and the most specific value wins per setting:

  1. Broker defaults in broker.conf (cluster-wide).
  2. Per-namespace override.
  3. Per-topic override.

An override only sets the fields it changes; unset fields inherit from the level above.

Broker defaults (broker.conf)

SettingDescriptionDefault
scalableTopicAutoScaleEnabledMaster switch for auto split/merge. When false, segments change only via manual split-segment / merge-segments.true
scalableTopicMaxSegmentsHard ceiling on active segments; splits stop once reached.64
scalableTopicMinSegmentsHard floor on active segments; merges stop once reached.1
scalableTopicMaxDagDepthMaximum merges in a segment's lineage; bounds split/merge flip-flopping (limits merges only -- splits are unaffected).10
scalableTopicSplitCooldownSecondsMinimum time between automatic splits on a topic (short -- only coalesces a burst of near-simultaneous triggers).60
scalableTopicMergeCooldownSecondsMinimum time between automatic merges on a topic.300
scalableTopicMergeWindowSecondsHow long a segment must stay continuously below every merge threshold before it becomes merge-eligible.300
scalableTopicSplitMsgRateInThresholdInbound messages/second above which a segment is split.10000
scalableTopicSplitBytesRateInThresholdInbound bytes/second above which a segment is split.50000000 (50 MB/s)
scalableTopicSplitMsgRateOutThresholdOutbound (dispatched) messages/second above which a segment is split.50000
scalableTopicSplitBytesRateOutThresholdOutbound bytes/second above which a segment is split.250000000 (250 MB/s)
scalableTopicMergeMsgRateInThresholdInbound messages/second below which a segment counts as cold for merging.1000
scalableTopicMergeBytesRateInThresholdInbound bytes/second below which a segment counts as cold.5000000 (5 MB/s)
scalableTopicMergeMsgRateOutThresholdOutbound messages/second below which a segment counts as cold.5000
scalableTopicMergeBytesRateOutThresholdOutbound bytes/second below which a segment counts as cold.25000000 (25 MB/s)
scalableTopicAutoScaleIntervalSecondsCadence of the controller's periodic traffic-driven evaluation. Consumer-count changes are handled immediately, independent of this interval.60
scalableTopicLoadReportIntervalSecondsHow often a segment-owning broker samples segment load for auto-scaling.10
scalableTopicLoadReportRateChangeThresholdMinimum relative change in a segment's rate (0.25 = 25%) since the last report that triggers a new load record; bounds metadata write volume.0.25
tip

Split thresholds sit well above the corresponding merge thresholds on purpose -- the gap between them is the hysteresis that stops a just-split segment from immediately re-merging. Preserve that ordering when you tune them.

Most of these settings are dynamic: apply them at runtime with pulsar-admin brokers update-dynamic-config without restarting. The two read only at broker startup are scalableTopicAutoScaleIntervalSeconds and scalableTopicLoadReportIntervalSeconds.

Per-namespace and per-topic overrides

Both override levels use the same fields as the broker settings (each optional; unset means inherit): enabled, maxSegments, minSegments, maxDagDepth, splitCooldownSeconds, mergeCooldownSeconds, mergeWindowSeconds, and the eight split*/merge* rate thresholds.

Overrides are set through the Java admin client or REST -- there is no pulsar-admin subcommand for them yet:

AutoScalePolicyOverride override = AutoScalePolicyOverride.builder()
.maxSegments(128)
.splitMsgRateInThreshold(20_000.0)
.build();

// Namespace level -- applies to every scalable topic in the namespace
admin.namespaces().setScalableTopicAutoScalePolicy("my-tenant/my-namespace", override);

// Topic level -- narrowest scope, wins over namespace and broker
admin.scalableTopics().setAutoScalePolicy("my-tenant/my-namespace/my-topic", override);

Read or clear an override with the matching getScalableTopicAutoScalePolicy / removeScalableTopicAutoScalePolicy (namespace) and getAutoScalePolicy / removeAutoScalePolicy (topic) methods.

Disable auto-scaling

To run a topic with manual split/merge only, set scalableTopicAutoScaleEnabled=false cluster-wide, or apply an override with enabled=false at the namespace or topic level. The controller then leaves the layout untouched and you drive it with split-segment / merge-segments.

Migrate a regular topic

An existing partitioned or non-partitioned topic can be migrated in place to a scalable topic, with no data copy:

bin/pulsar-admin scalable-topics migrate my-tenant/my-namespace/my-topic

The migration is rejected if legacy (non-V5) clients are still connected, unless you pass --force. Migration is one-way -- a scalable topic cannot be converted back. A full walkthrough, including the recommended sequence for upgrading clients first, is covered in the migration guide.

Delete a scalable topic

bin/pulsar-admin scalable-topics delete my-tenant/my-namespace/my-topic

Pass --force to delete even when the topic has active subscriptions.

REST API reference

OpenAPI documentation

All endpoints are under /admin/v2/scalable and take tenant, namespace, and (except for list) topic as path parameters.

Method & pathOperation
GET /{tenant}/{namespace}List scalable topics in a namespace
PUT /{tenant}/{namespace}/{topic}Create a scalable topic
GET /{tenant}/{namespace}/{topic}Get topic metadata (segment DAG)
GET /{tenant}/{namespace}/{topic}/statsGet aggregated stats
DELETE /{tenant}/{namespace}/{topic}Delete a scalable topic
POST /{tenant}/{namespace}/{topic}/migrateMigrate a regular topic to scalable
POST /{tenant}/{namespace}/{topic}/split/{segmentId}Split a segment
POST /{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}Merge two adjacent segments
GET /{tenant}/{namespace}/{topic}/autoScalePolicyGet the topic's auto split/merge override
POST /{tenant}/{namespace}/{topic}/autoScalePolicySet the topic's auto split/merge override
DELETE /{tenant}/{namespace}/{topic}/autoScalePolicyRemove the topic's auto split/merge override
PUT /{tenant}/{namespace}/{topic}/subscriptions/{subscription}Create a subscription
DELETE /{tenant}/{namespace}/{topic}/subscriptions/{subscription}Delete a subscription
POST /{tenant}/{namespace}/{topic}/subscriptions/{subscription}/seekSeek a subscription to a timestamp
POST /{tenant}/{namespace}/{topic}/subscriptions/{subscription}/skip-allClear a subscription's backlog