Skip to main content
Version: Next

Manage transactions

tip

This page only shows some frequently used operations.

  • For the latest and complete information about Pulsar admin, including commands, flags, descriptions, and more, see Pulsar admin doc

  • For the latest and complete information about REST API, including parameters, responses, samples, and more, see REST API doc.

  • For the latest and complete information about Java admin API, including classes, methods, descriptions, and more, see Java admin API doc.

Transaction resources

GetSlowTransactions

In the production environment, there may be some long-lasting transactions that have never been completed. You can get these slow transactions that have survived over a certain time under a coordinator or all coordinators in the following ways.

pulsar-admin transactions slow-transactions \
-c 1 -t 1s

The following is an example of the returned values.

{
"(0,3)": {
"txnId": "(0,3)",
"status": "OPEN",
"openTimestamp": 1658120122474,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,2)": {
"txnId": "(0,2)",
"status": "OPEN",
"openTimestamp": 1658120122471,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,5)": {
"txnId": "(0,5)",
"status": "OPEN",
"openTimestamp": 1658120122478,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,4)": {
"txnId": "(0,4)",
"status": "OPEN",
"openTimestamp": 1658120122476,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,7)": {
"txnId": "(0,7)",
"status": "OPEN",
"openTimestamp": 1658120122482,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,10)": {
"txnId": "(0,10)",
"status": "OPEN",
"openTimestamp": 1658120122488,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,6)": {
"txnId": "(0,6)",
"status": "OPEN",
"openTimestamp": 1658120122480,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,9)": {
"txnId": "(0,9)",
"status": "OPEN",
"openTimestamp": 1658120122486,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,8)": {
"txnId": "(0,8)",
"status": "OPEN",
"openTimestamp": 1658120122484,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,11)": {
"txnId": "(0,11)",
"status": "OPEN",
"openTimestamp": 1658120122490,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
}
}

ScaleTransactionCoordinators

When the performance of transactions reaches a bottleneck due to the insufficient number of transaction coordinators, you can scale the number of the transaction coordinators in the following ways.

pulsar-admin transactions scale-transactionCoordinators \
-r 17

Transaction stats

Get transaction metadata

The transaction metadata that can be retrieved include:

  • txnId: The ID of this transaction.
  • status: The status of this transaction.
  • openTimestamp: The open time of this transaction.
  • timeoutAt: The timeout of this transaction.
  • producedPartitions: The partitions or topics that messages have been sent to with this transaction.
  • ackedPartitions: The partitions or topics where messages have been acknowledged with this transaction.

Use one of the following ways to get your transaction metadata.

pulsar-admin transactions transaction-metadata\
-m 1 -l 1

The following is an example of the returned values.

{
"txnId" : "(1,18)",
"status" : "ABORTING",
"openTimestamp" : 1656592983374,
"timeoutAt" : 5000,
"producedPartitions" : {
"my-topic" : {
"startPosition" : "127:4959",
"aborted" : true
}
},
"ackedPartitions" : {
"my-topic" : {
"mysubName" : {
"cumulativeAckPosition" : null
}
}
}
}

Get transaction stats in transaction pending ack

The transaction stats in transaction pending ack that can be retrieved include:

  • cumulativeAckPosition: The position that this transaction cumulatively acknowledges in this subscription.

Use one of the following ways to get transaction stats in pending ack:

pulsar-admin transactions transaction-in-pending-ack-stats \
-m 1 -l 1 -t my-topic -s mysubname

The following is an example of the returned value.

{
"cumulativeAckPosition" : "137:49959"
}

Get transaction stats in transaction buffer

The transaction stats in the transaction buffer that can be retrieved include:

  • startPosition: The start position of this transaction in the transaction buffer.
  • aborted: The flag of whether this transaction has been aborted.

Use one of the following ways to get transaction stats in transaction buffer:

pulsar-admin transactions transaction-in-buffer-stats \
-m 1 -l 1 -t my-topic

The following is an example of the returned values.

{
"startPosition" : "137:49759",
"aborted" : false
}

Transaction coordinator stats

The transaction coordinator (TC) is a module inside a Pulsar broker. It maintains the entire life cycle of transactions and handles transaction timeout.

Get coordinator stats

The transaction coordinator stats that can be retrieved include:

  • state: The state of this transaction coordinator.
  • leastSigBit:s The sequence ID of this transaction coordinator.
  • lowWaterMark: The low watermark of this transaction coordinator.
  • ongoingTxnSize: The total number of ongoing transactions in this transaction coordinator.
  • recoverStartTime: The start timestamp of transaction coordinator recovery. 0L means no startup.
  • recoverEndTime: The end timestamp of transaction coordinator recovery. 0L means no startup.

Use one of the following ways to get transaction coordinator stats:

pulsar-admin transactions coordinator-stats \
-c 1

The following is an example of the returned values.

{
"state" : "Ready",
"leastSigBits" : 1,
"lowWaterMark" : 0,
"ongoingTxnSize" : 0,
"recoverStartTime" : 1657021892377,
"recoverEndTime" : 1657021892378
}

Get coordinator internal stats

The coordinator's internal stats that can be retrieved include:

  • transactionLogStats: The stats of the transaction coordinator log.
  • managedLedgerName: The name of the managed ledger where the transaction coordinator log is stored.
  • managedLedgerInternalStats: The internal stats of the managed ledger where the transaction coordinator log is stored. See [managedLedgerInternalStats](/docs/next/admin-api-topics#get-internal-stats) for more details.

Use one of the following ways to get coordinator’s internal stats:

pulsar-admin transactions coordinator-internal-stats \
-c 1 -m

The following is an example of the returned values.

{
"transactionLogStats" : {
"managedLedgerName" : "pulsar/system/persistent/__transaction_log_1",
"managedLedgerInternalStats" : {
"entriesAddedCounter" : 3,
"numberOfEntries" : 3,
"totalSize" : 63,
"currentLedgerEntries" : 3,
"currentLedgerSize" : 63,
"lastLedgerCreatedTimestamp" : "2022-06-30T18:18:05.88+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "13:2",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 13,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=63, lastEntryId=2, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVsc2FyL3N5c3RlbS9wZXJzaXN0ZW50L19fdHJhbnNhY3Rpb25fbG9nXzE=, application=base64:cHVsc2Fy}}",
"underReplicated" : false
} ],
"cursors" : {
"transaction.subscription" : {
"markDeletePosition" : "13:2",
"readPosition" : "13:3",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 3,
"cursorLedger" : 22,
"cursorLedgerLastEntry" : 1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2022-06-30T18:18:05.932+08:00",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
}
}
}
}

Transaction pending ack stats

Pending ack maintains message acknowledgments within a transaction before a transaction completes. If a message is in the pending acknowledge state, the message cannot be acknowledged by other transactions until the message is removed from the pending acknowledge state.

Get transaction pending ack stats

The transaction pending ack state stats that can be retrieved include:

  • state: The state of this transaction coordinator.
  • lowWaterMark: The low watermark of this transaction coordinator.
  • ongoingTxnSize: The total number of ongoing transactions in this transaction coordinator.
  • recoverStartTime: The start timestamp of transaction pendingAck recovery. 0L means no startup.
  • recoverEndTime: The end timestamp of transaction pendingAck recovery. 0L means no startup.

Use one of the following ways to get transaction pending ack stats:

pulsar-admin transactions pending-ack-stats \
-t my-topic -s mysubName -l

The following is an example of the returned values.

{
"state" : "Ready",
"lowWaterMarks" : {
"1" : 0
},
"ongoingTxnSize" : 1,
"recoverStartTime" : 1657021899202,
"recoverEndTime" : 1657021899203
}

Get transaction pending ack internal stats

The transaction pending ack internal stats that can be retrieved include:

  • transactionLogStats: The stats of the transaction pending ack log.
  • managedLedgerName: The name of the managed ledger where the transaction pending ack log is stored.
  • managedLedgerInternalStats: The internal stats of the managed ledger where the transaction coordinator log is stored. See [managedLedgerInternalStats](/docs/next/admin-api-topics#get-internal-stats) for more details.

Use one of the following ways to get transaction pending ack internal stats:

pulsar-admin transactions pending-ack-internal-stats \
-t my-topic -s mysubName -m

The following is an example of the returned values.

{
"pendingAckLogStats" : {
"managedLedgerName" : "public/default/persistent/my-topic-mysubName__transaction_pending_ack",
"managedLedgerInternalStats" : {
"entriesAddedCounter" : 2247,
"numberOfEntries" : 2247,
"totalSize" : 37212,
"currentLedgerEntries" : 104,
"currentLedgerSize" : 1732,
"lastLedgerCreatedTimestamp" : "2022-06-30T19:02:09.746+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 52,
"lastConfirmedEntry" : "64:51",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 56,
"entries" : 2195,
"size" : 36346,
"offloaded" : false,
"metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=36346, lastEntryId=2194, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVibGljL2RlZmF1bHQvcGVyc2lzdGVudC9teS10b3BpYy1teXN1Yk5hbWVfX3RyYW5zYWN0aW9uX3BlbmRpbmdfYWNr, application=base64:cHVsc2Fy}}",
"underReplicated" : false
}, {
"ledgerId" : 64,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=866, lastEntryId=51, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVibGljL2RlZmF1bHQvcGVyc2lzdGVudC9teS10b3BpYy1teXN1Yk5hbWVfX3RyYW5zYWN0aW9uX3BlbmRpbmdfYWNr, application=base64:cHVsc2Fy}}",
"underReplicated" : false
} ],
"cursors" : {
"__pending_ack_state" : {
"markDeletePosition" : "56:-1",
"readPosition" : "56:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 57,
"cursorLedgerLastEntry" : 0,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2022-06-30T18:55:26.842+08:00",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
}
}
}
}

Get position stats in pending ack

The position stats in pending ack include:

  • PendingAck The position is in pending ack stats.
  • MarkDelete The position is already acknowledged.
  • NotInPendingAck The position is not acknowledged within a transaction.
  • PendingAckNotReady The pending ack has not been initialized.
  • InvalidPosition The position is invalid, for example, batch index > batch size.

If you want to know whether the position has been acknowledged, you can use one of the following ways to get position stats pending ack:

pulsar-admin transactions position-stats-in-pending-ack \
-t my-topic -s mysubName -l 15 -e 6

The following is an example of the returned values.

{
"State" : "MarkDelete"
}

Transaction buffer stats

Transaction buffer handles messages produced to a topic partition within a transaction. The messages in the transaction buffer are not visible to consumers until the transactions are committed. The messages in the transaction buffer are discarded when the transactions are aborted.

Get transaction buffer stats

The transaction buffer stats that can be retrieved include:

  • state: The state of this transaction buffer.
  • maxReadPosition: The maximum read position of this transaction buffer.
  • lastSnapshotTimestamps: The last snapshot timestamp of this transaction buffer.
  • lowWaterMarks (Optional): The low watermark details of the transaction buffer.
  • ongoingTxnSize: The total number of ongoing transactions in this transaction buffer.
  • recoverStartTime: The start timestamp of transaction buffer recovery. 0L means no startup.
  • recoverEndTime: The end timestamp of transaction buffer recovery. 0L means no startup.

Use one of the following ways to get transaction buffer stats:

pulsar-admin transactions transaction-buffer-stats \
-t my-topic -l

The following is an example of the returned values.

{
"state" : "Ready",
"maxReadPosition" : "38:101",
"lastSnapshotTimestamps" : 1657021903534,
"lowWaterMarks" : {
"1" : -1,
"2" : -1
},
"ongoingTxnSize" : 0,
"recoverStartTime" : 1657021892850,
"recoverEndTime" : 1657021893372
}