Pulsar binary protocol specification


Pulsar uses a custom binary protocol for communications between producers/consumers and brokers. This protocol is designed to support required features, such as acknowledgements and flow control, while ensuring maximum transport and implementation efficiency.

Clients and brokers exchange commands with each other. Commands are formatted as binary protocol buffer (aka protobuf) messages. The format of protobuf commands is specified in the PulsarApi.proto file and also documented in the Protobuf interface section below.

Connection sharing

Commands for different producers and consumers can be interleaved and sent through the same connection without restriction.

All commands associated with Pulsar’s protocol are contained in a BaseCommand protobuf message that includes a Type enum with all possible subcommands as optional fields. BaseCommand messages can specify only one subcommand.

Framing

Since protobuf doesn’t provide any sort of message frame, all messages in the Pulsar protocol are prepended with a 4-byte field that specifies the size of the frame. The maximum allowable size of a single frame is 5 MB.

The Pulsar protocol allows for two types of commands:

  1. Simple commands that do not carry a message payload.
  2. Payload commands that bear a payload that is used when publishing or delivering messages. In payload commands, the protobuf command data is followed by protobuf metadata and then the payload, which is passed in raw format outside of protobuf. All sizes are passed as 4-byte unsigned big endian integers.

Message payloads are passed in raw format rather than protobuf format for efficiency reasons.

Simple commands

Simple (payload-free) commands have this basic structure:

Component Description Size (in bytes)
totalSize The size of the frame, counting everything that comes after it (in bytes) 4
commandSize The size of the protobuf-serialized command 4
message The protobuf message serialized in a raw binary format (rather than in protobuf format)  

Payload commands

Payload commands have this basic structure:

Component Description Size (in bytes)
totalSize The size of the frame, counting everything that comes after it (in bytes) 4
commandSize The size of the protobuf-serialized command 4
message The protobuf message serialized in a raw binary format (rather than in protobuf format)  
magicNumber A 2-byte byte array (0x0e01) identifying the current format 2
checksum A CRC32-C checksum of everything that comes after it 4
metadataSize The size of the message metadata 4
metadata The message metadata stored as a binary protobuf message  
payload Anything left in the frame is considered the payload and can include any sequence of bytes  

Message metadata

Message metadata is stored alongside the application-specified payload as a serialized protobuf message. Metadata is created by the producer and passed on unchanged to the consumer.

Field Description
producer_name The name of the producer that published the message
sequence_id The sequence ID of the message, assigned by producer
publish_time The publish timestamp in Unix time (i.e. as the number of milliseconds since January 1st, 1970 in UTC)
properties A sequence of key/value pairs (using the KeyValue message). These are application-defined keys and values with no special meaning to Pulsar.
replicated_from (optional) Indicates that the message has been replicated and specifies the name of the cluster where the message was originally published
partition_key (optional) While publishing on a partition topic, if the key is present, the hash of the key is used to determine which partition to choose
compression (optional) Signals that payload has been compressed and with which compression library
uncompressed_size (optional) If compression is used, the producer must fill the uncompressed size field with the original payload size
num_messages_in_batch (optional) If this message is really a batch of multiple entries, this field must be set to the number of messages in the batch

Batch messages

When using batch messages, the payload will be containing a list of entries, each of them with its individual metadata, defined by the SingleMessageMetadata object.

For a single batch, the payload format will look like this:

Field Description
metadataSizeN The size of the single message metadata serialized Protobuf
metadataN Single message metadata
payloadN Message payload passed by application

Each metadata field looks like this;

Field Description
properties Application-defined properties
partition key (optional) Key to indicate the hashing to a particular partition
payload_size Size of the payload for the single message in the batch

When compression is enabled, the whole batch will be compressed at once.

Interactions

Connection establishment

After opening a TCP connection to a broker, typically on port 6650, the client is responsible to initiate the session.

Connect interaction

After receiving a Connected response from the broker, the client can consider the connection ready to use. Alternatively, if the broker doesn’t validate the client authentication, it will reply with an Error command and close the TCP connection.

Example:

message CommandConnect {
  "client_version" : "Pulsar-Client-Java-v1.15.2",
  "auth_method_name" : "my-authentication-plugin",
  "auth_data" : "my-auth-data",
  "protocol_version" : 6
}

Fields:

  • client_version → String based identifier. Format is not enforced
  • auth_method_name(optional) Name of the authentication plugin if auth enabled
  • auth_data(optional) Plugin specific authentication data
  • protocol_version → Indicates the protocol version supported by the client. Broker will not send commands introduced in newer revisions of the protocol. Broker might be enforcing a minimum version
message CommandConnected {
  "server_version" : "Pulsar-Broker-v1.15.2",
  "protocol_version" : 6
}

Fields:

  • server_version → String identifier of broker version
  • protocol_version → Protocol version supported by the broker. Client must not attempt to send commands introduced in newer revisions of the protocol

Keep Alive

To identify prolonged network partitions between clients and brokers or cases in which a machine crashes without interrupting the TCP connection on the remote end (eg: power outage, kernel panic, hard reboot…), we have introduced a mechanism to probe for the availability status of the remote peer.

Both clients and brokers are sending Ping commands periodically and they will close the socket if a Pong response is not received within a timeout (default used by broker is 60s).

A valid implementation of a Pulsar client is not required to send the Ping probe, though it is required to promptly reply after receiving one from the broker in order to prevent the remote side from forcibly closing the TCP connection.

Producer

In order to send messages, a client needs to establish a producer. When creating a producer, the broker will first verify that this particular client is authorized to publish on the topic.

Once the client gets confirmation of the producer creation, it can publish messages to the broker, referring to the producer id negotiated before.

Producer interaction

Command Producer
message CommandProducer {
  "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  "producer_id" : 1,
  "request_id" : 1
}

Parameters:

  • topic → Complete topic name to where you want to create the producer on
  • producer_id → Client generated producer identifier. Needs to be unique within the same connection
  • request_id → Identifier for this request. Used to match the response with the originating request. Needs to be unique within the same connection
  • producer_name(optional) If a producer name is specified, the name will be used, otherwise the broker will generate a unique name. Generated producer name is guaranteed to be globally unique. Implementations are expected to let the broker generate a new producer name when the producer is initially created, then reuse it when recreating the producer after reconnections.

The broker will reply with either ProducerSuccess or Error commands.

Command ProducerSuccess
message CommandProducerSuccess {
  "request_id" :  1,
  "producer_name" : "generated-unique-producer-name"
}

Parameters:

  • request_id → Original id of the CreateProducer request
  • producer_name → Generated globally unique producer name or the name specified by the client, if any.
Command Send

Command Send is used to publish a new message within the context of an already existing producer. This command is used in a frame that includes command as well as message payload, for which the complete format is specified in the payload commands section.

message CommandSend {
  "producer_id" : 1,
  "sequence_id" : 0,
  "num_messages" : 1
}

Parameters:

  • producer_id → id of an existing producer
  • sequence_id → each message has an associated sequence id which is expected to be implemented with a counter starting at 0. The SendReceipt that acknowledges the effective publishing of a messages will refer to it by its sequence id.
  • num_messages(optional) Used when publishing a batch of messages at once.
Command SendReceipt

After a message has been persisted on the configured number of replicas, the broker will send the acknowledgment receipt to the producer.

message CommandSendReceipt {
  "producer_id" : 1,
  "sequence_id" : 0,
  "message_id" : {
    "ledgerId" : 123,
    "entryId" : 456
  }
}

Parameters:

  • producer_id → id of producer originating the send request
  • sequence_id → sequence id of the published message
  • message_id → message id assigned by the system to the published message Unique within a single cluster. Message id is composed of 2 longs, ledgerId and entryId, that reflect that this unique id is assigned when appending to a BookKeeper ledger
Command CloseProducer

Note: This command can be sent by either producer or broker.

When receiving a CloseProducer command, the broker will stop accepting any more messages for the producer, wait until all pending messages are persisted and then reply Success to the client.

The broker can send a CloseProducer command to client when it’s performing a graceful failover (eg: broker is being restarted, or the topic is being unloaded by load balancer to be transferred to a different broker).

When receiving the CloseProducer, the client is expected to go through the service discovery lookup again and recreate the producer again. The TCP connection is not affected.

Consumer

A consumer is used to attach to a subscription and consume messages from it. After every reconnection, a client needs to subscribe to the topic. If a subscription is not already there, a new one will be created.

Consumer

Flow control

After the consumer is ready, the client needs to give permission to the broker to push messages. This is done with the Flow command.

A Flow command gives additional permits to send messages to the consumer. A typical consumer implementation will use a queue to accumulate these messages before the application is ready to consume them.

After the application has dequeued a number of message, the consumer will send additional number of permits to allow the broker to push more messages.

Command Subscribe
message CommandSubscribe {
  "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  "subscription" : "my-subscription-name",
  "subType" : "Exclusive",
  "consumer_id" : 1,
  "request_id" : 1
}

Parameters:

  • topic → Complete topic name to where you want to create the consumer on
  • subscription → Subscription name
  • subType → Subscription type: Exclusive, Shared, Failover
  • consumer_id → Client generated consumer identifier. Needs to be unique within the same connection
  • request_id → Identifier for this request. Used to match the response with the originating request. Needs to be unique within the same connection
  • consumer_name(optional) Clients can specify a consumer name. This name can be used to track a particular consumer in the stats. Also, in Failover subscription type, the name is used to decide which consumer is elected as master (the one receiving messages): consumers are sorted by their consumer name and the first one is elected master.
Command Flow
message CommandFlow {
  "consumer_id" : 1,
  "messagePermits" : 1000
}

Parameters:

  • consumer_id → Id of an already established consumer
  • messagePermits → Number of additional permits to grant to the broker for pushing more messages
Command Message

Command Message is used by the broker to push messages to an existing consumer, within the limits of the given permits.

This command is used in a frame that includes the message payload as well, for which the complete format is specified in the payload commands section.

message CommandMessage {
  "consumer_id" : 1,
  "message_id" : {
    "ledgerId" : 123,
    "entryId" : 456
  }
}
Command Ack

An Ack is used to signal to the broker that a given message has been successfully processed by the application and can be discarded by the broker.

In addition, the broker will also maintain the consumer position based on the acknowledged messages.

message CommandAck {
  "consumer_id" : 1,
  "ack_type" : "Individual",
  "message_id" : {
    "ledgerId" : 123,
    "entryId" : 456
  }
}

Parameters:

  • consumer_id → Id of an already established consumer
  • ack_type → Type of acknowledgment: Individual or Cumulative
  • message_id → Id of the message to acknowledge
  • validation_error(optional) Indicates that the consumer has discarded the messages due to: UncompressedSizeCorruption, DecompressionError, ChecksumMismatch, BatchDeSerializeError
Command CloseConsumer

Note: This command can be sent by either producer or broker.

This command behaves the same as CloseProducer

Command RedeliverUnacknowledgedMessages

A consumer can ask the broker to redeliver some or all of the pending messages that were pushed to that particular consumer and not yet acknowledged.

The protobuf object accepts a list of message ids that the consumer wants to be redelivered. The message ids will be honored by the broker only if the subscription type is shared. For other subscription types or if the list is empty, the broker will redeliver all the pending messages.

On redelivery, messages can be sent to the same consumer or, in the case of a shared subscription, spread across all available consumers.

Command ReachedEndOfTopic

This is sent by a broker to a particular consumer, whenever the topic has been “terminated” and all the messages on the subscription were acknowledged.

The client should use this command to notify the application that no more messages are coming from the consumer.

Command ConsumerStats

This command is sent by the client to retrieve Subscriber and Consumer level stats from the broker. Parameters:

  • request_id → Id of the request, used to correlate the request and the response.
  • consumer_id → Id of an already established consumer.
Command ConsumerStatsResponse

This is the broker’s response to ConsumerStats request by the client. It contains the Subscriber and Consumer level stats of the consumer_id sent in the request. If the error_code or the error_message field is set it indicates that the request has failed.

Command Unsubscribe

This command is sent by the client to unsubscribe the consumer_id from the associated topic. Parameters:

  • request_id → Id of the request.
  • consumer_id → Id of an already established consumer which needs to unsubscribe.

Service discovery

Topic lookup

Topic lookup needs to be performed each time a client needs to create or reconnect a producer or a consumer. Lookup is used to discover which particular broker is serving the topic we are about to use.

Lookup can be done with a REST call as described in the admin API docs.

Since Pulsar-1.16 it is also possible to perform the lookup within the binary protocol.

For the sake of example, let’s assume we have a service discovery component running at pulsar://broker.example.com:6650

Individual brokers will be running at pulsar://broker-1.example.com:6650, pulsar://broker-2.example.com:6650, …

A client can use a connection to the discovery service host to issue a LookupTopic command. The response can either be a broker hostname to connect to, or a broker hostname to which retry the lookup.

The LookupTopic command has to be used in a connection that has already gone through the Connect / Connected initial handshake.

Topic lookup

message CommandLookupTopic {
  "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  "request_id" : 1,
  "authoritative" : false
}

Fields:

  • topic → Topic name to lookup
  • request_id → Id of the request that will be passed with its response
  • authoritative → Initial lookup request should use false. When following a redirect response, client should pass the same value contained in the response
LookupTopicResponse

Example of response with successful lookup:

message CommandLookupTopicResponse {
  "request_id" : 1,
  "response" : "Connect",
  "brokerServiceUrl" : "pulsar://broker-1.example.com:6650",
  "brokerServiceUrlTls" : "pulsar+ssl://broker-1.example.com:6651",
  "authoritative" : true
}

Example of lookup response with redirection:

message CommandLookupTopicResponse {
  "request_id" : 1,
  "response" : "Redirect",
  "brokerServiceUrl" : "pulsar://broker-2.example.com:6650",
  "brokerServiceUrlTls" : "pulsar+ssl://broker-2.example.com:6651",
  "authoritative" : true
}

In this second case, we need to reissue the LookupTopic command request to broker-2.example.com and this broker will be able to give a definitive answer to the lookup request.

Partitioned topics discovery

Partitioned topics metadata discovery is used to find out if a topic is a “partitioned topic” and how many partitions were set up.

If the topic is marked as “partitioned”, the client is expected to create multiple producers or consumers, one for each partition, using the partition-X suffix.

This information only needs to be retrieved the first time a producer or consumer is created. There is no need to do this after reconnections.

The discovery of partitioned topics metadata works very similar to the topic lookup. The client send a request to the service discovery address and the response will contain actual metadata.

Command PartitionedTopicMetadata
message CommandPartitionedTopicMetadata {
  "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
  "request_id" : 1
}

Fields:

  • topic → the topic for which to check the partitions metadata
  • request_id → Id of the request that will be passed with its response
Command PartitionedTopicMetadataResponse

Example of response with metadata:

message CommandPartitionedTopicMetadataResponse {
  "request_id" : 1,
  "response" : "Success",
  "partitions" : 32
}

Protobuf interface

Protobuf messages

BaseCommand

pulsar.proto.BaseCommand

Fields
Name Type Label Default Description
type Type required
connect CommandConnect optional
connected CommandConnected optional
subscribe CommandSubscribe optional
producer CommandProducer optional
send CommandSend optional
send_receipt CommandSendReceipt optional
send_error CommandSendError optional
message CommandMessage optional
ack CommandAck optional
flow CommandFlow optional
unsubscribe CommandUnsubscribe optional
success CommandSuccess optional
error CommandError optional
close_producer CommandCloseProducer optional
close_consumer CommandCloseConsumer optional
producer_success CommandProducerSuccess optional
ping CommandPing optional
pong CommandPong optional
redeliverUnacknowledgedMessages CommandRedeliverUnacknowledgedMessages optional
partitionMetadata CommandPartitionedTopicMetadata optional
partitionMetadataResponse CommandPartitionedTopicMetadataResponse optional
lookupTopic CommandLookupTopic optional
lookupTopicResponse CommandLookupTopicResponse optional
consumerStats CommandConsumerStats optional
consumerStatsResponse CommandConsumerStatsResponse optional
reachedEndOfTopic CommandReachedEndOfTopic optional
seek CommandSeek optional
getLastMessageId CommandGetLastMessageId optional
getLastMessageIdResponse CommandGetLastMessageIdResponse optional
active_consumer_change CommandActiveConsumerChange optional
getTopicsOfNamespace CommandGetTopicsOfNamespace optional
getTopicsOfNamespaceResponse CommandGetTopicsOfNamespaceResponse optional
getSchema CommandGetSchema optional
getSchemaResponse CommandGetSchemaResponse optional
CommandAck

pulsar.proto.CommandAck

Fields
Name Type Label Default Description
consumer_id uint64 required
ack_type AckType required
message_id MessageIdData repeated In case of individual acks, the client can pass a list of message ids
validation_error ValidationError optional
properties KeyLongValue repeated
CommandActiveConsumerChange

pulsar.proto.CommandActiveConsumerChange

changes on active consumer

Fields
Name Type Label Default Description
consumer_id uint64 required
is_active bool optional false
CommandCloseConsumer

pulsar.proto.CommandCloseConsumer

Fields
Name Type Label Default Description
consumer_id uint64 required
request_id uint64 required
CommandCloseProducer

pulsar.proto.CommandCloseProducer

Fields
Name Type Label Default Description
producer_id uint64 required
request_id uint64 required
CommandConnect

pulsar.proto.CommandConnect

Fields
Name Type Label Default Description
client_version string required
auth_method AuthMethod optional Deprecated. Use "auth_method_name" instead.
auth_method_name string optional
auth_data bytes optional
protocol_version int32 optional 0
proxy_to_broker_url string optional Client can ask to be proxyied to a specific broker This is only honored by a Pulsar proxy
original_principal string optional Original principal that was verified by a Pulsar proxy. In this case the auth info above will be the auth of the proxy itself
original_auth_data string optional Original auth role and auth Method that was passed to the proxy. In this case the auth info above will be the auth of the proxy itself
original_auth_method string optional
CommandConnected

pulsar.proto.CommandConnected

Fields
Name Type Label Default Description
server_version string required
protocol_version int32 optional 0
CommandConsumerStats

pulsar.proto.CommandConsumerStats

Fields
Name Type Label Default Description
request_id uint64 required
consumer_id uint64 required required string topic_name = 2; required string subscription_name = 3;
CommandConsumerStatsResponse

pulsar.proto.CommandConsumerStatsResponse

Fields
Name Type Label Default Description
request_id uint64 required
error_code ServerError optional
error_message string optional
msgRateOut double optional Total rate of messages delivered to the consumer. msg/s
msgThroughputOut double optional Total throughput delivered to the consumer. bytes/s
msgRateRedeliver double optional Total rate of messages redelivered by this consumer. msg/s
consumerName string optional Name of the consumer
availablePermits uint64 optional Number of available message permits for the consumer
unackedMessages uint64 optional Number of unacknowledged messages for the consumer
blockedConsumerOnUnackedMsgs bool optional Flag to verify if consumer is blocked due to reaching threshold of unacked messages
address string optional Address of this consumer
connectedSince string optional Timestamp of connection
type string optional Whether this subscription is Exclusive or Shared or Failover
msgRateExpired double optional Total rate of messages expired on this subscription. msg/s
msgBacklog uint64 optional Number of messages in the subscription backlog
CommandError

pulsar.proto.CommandError

Fields
Name Type Label Default Description
request_id uint64 required
error ServerError required
message string required
CommandFlow

pulsar.proto.CommandFlow

Fields
Name Type Label Default Description
consumer_id uint64 required
messagePermits uint32 required Max number of messages to prefetch, in addition of any number previously specified
CommandGetLastMessageId

pulsar.proto.CommandGetLastMessageId

Fields
Name Type Label Default Description
consumer_id uint64 required
request_id uint64 required
CommandGetLastMessageIdResponse

pulsar.proto.CommandGetLastMessageIdResponse

Fields
Name Type Label Default Description
last_message_id MessageIdData required
request_id uint64 required
CommandGetSchema

pulsar.proto.CommandGetSchema

Fields
Name Type Label Default Description
request_id uint64 required
topic string required
schema_version bytes optional
CommandGetSchemaResponse

pulsar.proto.CommandGetSchemaResponse

Fields
Name Type Label Default Description
request_id uint64 required
error_code ServerError optional
error_message string optional
schema Schema optional
schema_version bytes optional
CommandGetTopicsOfNamespace

pulsar.proto.CommandGetTopicsOfNamespace

Fields
Name Type Label Default Description
request_id uint64 required
namespace string required
mode Mode optional PERSISTENT
CommandGetTopicsOfNamespaceResponse

pulsar.proto.CommandGetTopicsOfNamespaceResponse

Fields
Name Type Label Default Description
request_id uint64 required
topics string repeated
CommandLookupTopic

pulsar.proto.CommandLookupTopic

Fields
Name Type Label Default Description
topic string required
request_id uint64 required
authoritative bool optional false
original_principal string optional TODO - Remove original_principal, original_auth_data, original_auth_method Original principal that was verified by a Pulsar proxy.
original_auth_data string optional Original auth role and auth Method that was passed to the proxy.
original_auth_method string optional
CommandLookupTopicResponse

pulsar.proto.CommandLookupTopicResponse

Fields
Name Type Label Default Description
brokerServiceUrl string optional Optional in case of error
brokerServiceUrlTls string optional
response LookupType optional
request_id uint64 required
authoritative bool optional false
error ServerError optional
message string optional
proxy_through_service_url bool optional false If it's true, indicates to the client that it must always connect through the service url after the lookup has been completed.
CommandMessage

pulsar.proto.CommandMessage

Fields
Name Type Label Default Description
consumer_id uint64 required
message_id MessageIdData required
redelivery_count uint32 optional 0
CommandPartitionedTopicMetadata

pulsar.proto.CommandPartitionedTopicMetadata

Fields
Name Type Label Default Description
topic string required
request_id uint64 required
original_principal string optional TODO - Remove original_principal, original_auth_data, original_auth_method Original principal that was verified by a Pulsar proxy.
original_auth_data string optional Original auth role and auth Method that was passed to the proxy.
original_auth_method string optional
CommandPartitionedTopicMetadataResponse

pulsar.proto.CommandPartitionedTopicMetadataResponse

Fields
Name Type Label Default Description
partitions uint32 optional Optional in case of error
request_id uint64 required
response LookupType optional
error ServerError optional
message string optional
CommandPing

pulsar.proto.CommandPing

Commands to probe the state of connection. When either client or broker doesn't receive commands for certain amount of time, they will send a Ping probe.

This message has no fields.

CommandPong

pulsar.proto.CommandPong

This message has no fields.

CommandProducer

pulsar.proto.CommandProducer

Create a new Producer on a topic, assigning the given producer_id, / all messages sent with this producer_id will be persisted on the topic

Fields
Name Type Label Default Description
topic string required
producer_id uint64 required
request_id uint64 required
producer_name string optional If a producer name is specified, the name will be used, / otherwise the broker will generate a unique name
encrypted bool optional false
metadata KeyValue repeated Add optional metadata key=value to this producer
schema Schema optional
CommandProducerSuccess

pulsar.proto.CommandProducerSuccess

Response from CommandProducer

Fields
Name Type Label Default Description
request_id uint64 required
producer_name string required
last_sequence_id int64 optional -1 The last sequence id that was stored by this producer in the previous session This will only be meaningful if deduplication has been enabled.
schema_version bytes optional
CommandReachedEndOfTopic

pulsar.proto.CommandReachedEndOfTopic

Message sent by broker to client when a topic has been forcefully terminated and there are no more messages left to consume

Fields
Name Type Label Default Description
consumer_id uint64 required
CommandRedeliverUnacknowledgedMessages

pulsar.proto.CommandRedeliverUnacknowledgedMessages

Fields
Name Type Label Default Description
consumer_id uint64 required
message_ids MessageIdData repeated
CommandSeek

pulsar.proto.CommandSeek

Reset an existing consumer to a particular message id

Fields
Name Type Label Default Description
consumer_id uint64 required
request_id uint64 required
message_id MessageIdData optional
CommandSend

pulsar.proto.CommandSend

Fields
Name Type Label Default Description
producer_id uint64 required
sequence_id uint64 required
num_messages int32 optional 1
CommandSendError

pulsar.proto.CommandSendError

Fields
Name Type Label Default Description
producer_id uint64 required
sequence_id uint64 required
error ServerError required
message string required
CommandSendReceipt

pulsar.proto.CommandSendReceipt

Fields
Name Type Label Default Description
producer_id uint64 required
sequence_id uint64 required
message_id MessageIdData optional
CommandSubscribe

pulsar.proto.CommandSubscribe

Fields
Name Type Label Default Description
topic string required
subscription string required
subType SubType required
consumer_id uint64 required
request_id uint64 required
consumer_name string optional
priority_level int32 optional
durable bool optional true Signal wether the subscription should be backed by a durable cursor or not
start_message_id MessageIdData optional If specified, the subscription will position the cursor markd-delete position on the particular message id and will send messages from that point
metadata KeyValue repeated Add optional metadata key=value to this consumer
read_compacted bool optional
schema Schema optional
initialPosition InitialPosition optional Latest Signal wthether the subscription will initialize on latest or not -- earliest
CommandSuccess

pulsar.proto.CommandSuccess

Fields
Name Type Label Default Description
request_id uint64 required
schema Schema optional
CommandUnsubscribe

pulsar.proto.CommandUnsubscribe

Fields
Name Type Label Default Description
consumer_id uint64 required
request_id uint64 required
EncryptionKeys

pulsar.proto.EncryptionKeys

Fields
Name Type Label Default Description
key string required
value bytes required
metadata KeyValue repeated
KeyLongValue

pulsar.proto.KeyLongValue

Fields
Name Type Label Default Description
key string required
value uint64 required
KeyValue

pulsar.proto.KeyValue

Fields
Name Type Label Default Description
key string required
value string required
MessageIdData

pulsar.proto.MessageIdData

Fields
Name Type Label Default Description
ledgerId uint64 required
entryId uint64 required
partition int32 optional -1
batch_index int32 optional -1
MessageMetadata

pulsar.proto.MessageMetadata

Fields
Name Type Label Default Description
producer_name string required
sequence_id uint64 required
publish_time uint64 required
properties KeyValue repeated
replicated_from string optional Property set on replicated message, includes the source cluster name
partition_key string optional key to decide partition for the msg
replicate_to string repeated Override namespace's replication
compression CompressionType optional NONE
uncompressed_size uint32 optional 0
num_messages_in_batch int32 optional 1 Removed below checksum field from Metadata as it should be part of send-command which keeps checksum of header + payload optional sfixed64 checksum = 10; differentiate single and batch message metadata
event_time uint64 optional 0 the timestamp that this event occurs. it is typically set by applications. if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
encryption_keys EncryptionKeys repeated Contains encryption key name, encrypted key and metadata to describe the key
encryption_algo string optional Algorithm used to encrypt data key
encryption_param bytes optional Additional parameters required by encryption
schema_version bytes optional
partition_key_b64_encoded bool optional false
Schema

pulsar.proto.Schema

Fields
Name Type Label Default Description
name string required
schema_data bytes required
type Type required
properties KeyValue repeated
SingleMessageMetadata

pulsar.proto.SingleMessageMetadata

Fields
Name Type Label Default Description
properties KeyValue repeated
partition_key string optional
payload_size int32 required
compacted_out bool optional false
event_time uint64 optional 0 the timestamp that this event occurs. it is typically set by applications. if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
partition_key_b64_encoded bool optional false

Protobuf enums

AuthMethod

Enum values
Name Number Description
AuthMethodNone 0
AuthMethodYcaV1 1
AuthMethodAthens 2
Type

Enum values
Name Number Description
CONNECT 2
CONNECTED 3
SUBSCRIBE 4
PRODUCER 5
SEND 6
SEND_RECEIPT 7
SEND_ERROR 8
MESSAGE 9
ACK 10
FLOW 11
UNSUBSCRIBE 12
SUCCESS 13
ERROR 14
CLOSE_PRODUCER 15
CLOSE_CONSUMER 16
PRODUCER_SUCCESS 17
PING 18
PONG 19
REDELIVER_UNACKNOWLEDGED_MESSAGES 20
PARTITIONED_METADATA 21
PARTITIONED_METADATA_RESPONSE 22
LOOKUP 23
LOOKUP_RESPONSE 24
CONSUMER_STATS 25
CONSUMER_STATS_RESPONSE 26
REACHED_END_OF_TOPIC 27
SEEK 28
GET_LAST_MESSAGE_ID 29
GET_LAST_MESSAGE_ID_RESPONSE 30
ACTIVE_CONSUMER_CHANGE 31
GET_TOPICS_OF_NAMESPACE 32
GET_TOPICS_OF_NAMESPACE_RESPONSE 33
GET_SCHEMA 34
GET_SCHEMA_RESPONSE 35
AckType

Enum values
Name Number Description
Individual 0
Cumulative 1
ValidationError

Acks can contain a flag to indicate the consumer received an invalid message that got discarded before being passed on to the application.

Enum values
Name Number Description
UncompressedSizeCorruption 0
DecompressionError 1
ChecksumMismatch 2
BatchDeSerializeError 3
DecryptionError 4
Mode

Enum values
Name Number Description
PERSISTENT 0
NON_PERSISTENT 1
ALL 2
LookupType

Enum values
Name Number Description
Redirect 0
Connect 1
Failed 2
LookupType

Enum values
Name Number Description
Success 0
Failed 1
InitialPosition

Enum values
Name Number Description
Latest 0
Earliest 1
SubType

Enum values
Name Number Description
Exclusive 0
Shared 1
Failover 2
CompressionType

Enum values
Name Number Description
NONE 0
LZ4 1
ZLIB 2
ProtocolVersion

Each protocol version identify new features that are incrementally added to the protocol

Enum values
Name Number Description
v0 0

Initial versioning

v1 1

Added application keep-alive

v2 2

Added RedeliverUnacknowledgedMessages Command

v3 3

Added compression with LZ4 and ZLib

v4 4

Added batch message support

v5 5

Added disconnect client w/o closing connection

v6 6

Added checksum computation for metadata + payload

v7 7

Added CommandLookupTopic - Binary Lookup

v8 8

Added CommandConsumerStats - Client fetches broker side consumer stats

v9 9

Added end of topic notification

v10 10

Added proxy to broker

v11 11

C++ consumers before this version are not correctly handling the checksum field

v12 12

Added get topic’s last messageId from broker

v13 13

Added CommandActiveConsumerChange Added CommandGetTopicsOfNamespace

Schema-registry : added avro schema format for json

Type

Enum values
Name Number Description
None 0
String 1
Json 2
Protobuf 3
Avro 4
ServerError

Enum values
Name Number Description
UnknownError 0
MetadataError 1

Error with ZK/metadata

PersistenceError 2

Error writing reading from BK

AuthenticationError 3

Non valid authentication

AuthorizationError 4

Not authorized to use resource

ConsumerBusy 5

Unable to subscribe/unsubscribe because

ServiceNotReady 6

other consumers are connected

Any error that requires client retry operation with a fresh lookup

ProducerBlockedQuotaExceededError 7

Unable to create producer because backlog quota exceeded

ProducerBlockedQuotaExceededException 8

Exception while creating producer because quota exceeded

ChecksumError 9

Error while verifying message checksum

UnsupportedVersionError 10

Error when an older client/version doesn’t support a required feature

TopicNotFound 11

Topic not found

SubscriptionNotFound 12

Subscription not found

ConsumerNotFound 13

Consumer not found

TooManyRequests 14

Error with too many simultaneously request

TopicTerminatedError 15

The topic has been terminated

ProducerBusy 16

Producer with same name is already connected

InvalidTopicName 17

The topic name is not valid