Pulsar concepts and architectureA high-level overview of Pulsar's moving pieces
Pulsar’s key features include:
- Native support for multiple clusters in a Pulsar instance, with seamless geo-replication of messages across clusters
- Very low publish and end-to-end latency
- Seamless scalability out to over a million topics
- A simple client API with bindings for Java, Python, and C++
- Multiple subscription modes for topics (exclusive, shared, and failover)
- Guaranteed message delivery with persistent message storage provided by Apache BookKeeper
Producers, consumers, topics, and subscriptions
Pulsar is built on the publish-subscribe pattern, aka pub-sub. In this pattern, producers publish messages to topics. Consumers can then subscribe to those topics, process incoming messages, and send an acknowledgement when processing is complete.
Once a subscription has been created, all messages will be retained by Pulsar, even if the consumer gets disconnected. Retained messages will be discarded only when a consumer acknowledges that they’ve been successfully processed.
A producer is a process that attaches to a topic and publishes messages to a Pulsar broker for processing.
Producers can send messages to brokers either synchronously (sync) or asynchronously (async).
|Sync send||The producer will wait for acknowledgement from the broker after sending each message. If acknowledgment isn’t received then the producer will consider the send operation a failure.|
|Async send||The producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full (max size configurable, the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer.|
Messages published by producers can be compressed during transportation in order to save bandwidth. Pulsar currently supports two types of compression:
If batching is enabled, the producer will accumulate and send a batch of messages in a single request. Batching size is defined by the maximum number of messages and maximum publish latency.
A consumer is a process that attaches to a topic via a subscription and then receives messages.
Messages can be received from brokers either synchronously (sync) or asynchronously (async).
|Sync receive||A sync receive will be blocked until a message is available.|
|Async receive||An async receive will return immediately with a future value—a
When a consumer has successfully processed a message, it needs to send an acknowledgement to the broker so that the broker can discard the message (otherwise it stores the message).
Messages can be acknowledged either one by one or cumulatively. With cumulative acknowledgement, the consumer only needs to acknowledge the last message it received. All messages in the stream up to (and including) the provided message will not be re-delivered to that consumer.
Cumulative acknowledgement cannot be used with shared subscription mode, because shared mode involves multiple consumers having access to the same subscription.
Client libraries can provide their own listener implementations for consumers. The Java client, for example, provides a
MesssageListener interface. In this interface, the
received method is called whenever a new message is received.
As in other pub-sub systems, topics in Pulsar are named channels for transmitting messages from producers to consumers. Topic names are URLs that have a well-defined structure:
|Topic name component||Description|
||It identifies type of topic. Pulsar supports two kind of topics: persistent and non-persistent. In persistent topic, all messages are durably persisted on disk (that means on multiple disks unless the broker is standalone), whereas non-persistent topic does not persist message into storage disk.|
||The topic’s tenant within the instance. Tenants are essential to multi-tenancy in Pulsar and can be spread across clusters.|
||Where the topic is located. Typically there will be one cluster for each geographical region or data center.|
||The administrative unit of the topic, which acts as a grouping mechanism for related topics. Most topic configuration is performed at the namespace level. Each property (tenant) can have multiple namespaces.|
||The final part of the name. Topic names are freeform and have no special meaning in a Pulsar instance.|
No need to explicitly create new topics
Application does not explicitly create the topic but attempting to write or receive message on a topic that does not yet exist, Pulsar will automatically create that topic under the namespace.
A namespace is a logical nomenclature within a property. A property can create multiple namespaces via admin API. For instance, a property with different applications can create a separate namespace for each application. A namespace allows the application to create and manage a hierarchy of topics.
my-property/my-cluster/my-property-app1 is a namespace for the application
my-property-app1 in cluster
Application can create any number of topics under the namespace.
A subscription is a named configuration rule that determines how messages are delivered to consumers. There are three available subscription modes in Pulsar: exclusive, shared, and failover. These modes are illustrated in the figure below.
In exclusive mode, only a single consumer is allowed to attach to the subscription. If more than one consumer attempts to subscribe to a topic using the same subscription, the consumer receives an error.
In the diagram above, only Consumer-A is allowed to consume messages.
Exclusive mode is the default subscription mode.
In shared or round robin mode, multiple consumers can attach to the same subscription. Messages are delivered in a round robin distribution across consumers, and any given message is delivered to only one consumer. When a consumer disconnects, all the messages that were sent to it and not acknowledged will be rescheduled for sending to the remaining consumers.
In the diagram above, Consumer-B-1 and Consumer-B-2 are able to subscribe to the topic, but Consumer-C-1 and others could as well.
Limitations of shared mode
There are two important things to be aware of when using shared mode:
- Message ordering is not guaranteed.
- You cannot use cumulative acknowledgment with shared mode.
In failover mode, multiple consumers can attach to the same subscription. The consumers will be lexically sorted by the consumer’s name and the first consumer will initially be the only one receiving messages. This consumer is called the master consumer.
When the master consumer disconnects, all (non-acked and subsequent) messages will be delivered to the next consumer in line.
In the diagram above, Consumer-C-1 is the master consumer while Consumer-C-2 would be the next in line to receive messages if Consumer-C-2 disconnected.
Normal topics can be served only by a single broker, which limits the topic’s maximum throughput. Partitioned topics are a special type of topic that be handled by multiple brokers, which allows for much higher throughput.
Behind the scenes, a partitioned topic is actually implemented as N internal topics, where N is the number of partitions. When publishing messages to a partitioned topic, each message is routed to one of several brokers. The distribution of partitions across brokers is handled automatically by Pulsar.
The diagram below illustrates this:
Here, the topic T1 has five partitions (P0 through P4) split across three brokers. Because there are more partitions than brokers, two brokers handle two partitions a piece, while the third handles only one (again, Pulsar handles this distribution of partitions automatically).
Messages for this topic are broadcast to two consumers. The routing mode determines both which broker handles each partition, while the subscription mode determines which messages go to which consumers.
Decisions about routing and subscription modes can be made separately in most cases. Throughput concerns should guide partitioning/routing decisions while subscription decisions should be guided by application semantics.
There is no difference between partitioned topics and normal topics in terms of how subscription modes work, as partitioning only determines what happens between when a message is published by a producer and processed and acknowledged by a consumer.
Partitioned topics need to be explicitly created via the admin API. The number of partitions can be specified when creating the topic.
When publishing to partitioned topics, you must specify a routing mode. The routing mode determines which partition—that is, which internal topic—each message should be published to.
There are three routing modes available by default:
|Key hash||If a key property has been specified on the message, the partitioned producer will hash the key and assign it to a particular partition.||Per-key-bucket ordering|
|Single default partition||If no key is provided, each producer’s message will be routed to a dedicated partition, initially random selected||Per-producer ordering|
|Round robin distribution||If no key is provided, all messages will be routed to different partitions in round-robin fashion to achieve maximum throughput.||None|
This feature is still in experimental mode and implementation details may change in future release.
As name suggests, non-persist topic does not persist messages into any durable storage disk unlike persistent topic where messages are durably persisted on multiple disks.
Therefore, if you are using persistent delivery, messages are persisted to disk/database so that they will survive a broker restart or subscriber failover. While using non-persistent delivery, if you kill a broker or subscriber is disconnected then subscriber will lose all in-transit messages. So, client may see message loss with non-persistent topic.
- In non-persistent topic, as soon as broker receives published message, it immediately delivers this message to all connected subscribers without persisting them into any storage. So, if subscriber gets disconnected with broker then broker will not be able to deliver those in-transit messages and subscribers will never be able to receive those messages again. Broker also drops a message for the consumer, if consumer does not have enough permit to consume message, or consumer TCP channel is not writable. Therefore, consumer receiver queue size (to accommodate enough permits) and TCP-receiver window size (to keep channel writable) should be configured properly to avoid message drop for that consumer.
- Broker only allows configured number of in-flight messages per client connection. So, if producer tries to publish messages higher than this rate, then broker silently drops those new incoming messages without processing and delivering them to the subscribers. However, broker acknowledges with special message-id (
msg-id: -1:-1) for those dropped messages to signal producer about the message drop.
Non-persistent messaging is usually faster than persistent messaging because broker does not persist messages and immediately sends ack back to producer as soon as that message deliver to all connected subscribers. Therefore, producer sees comparatively low publish latency with non-persistent topic.
A topic name will look like:
Producer and consumer can connect to non-persistent topic in a similar way, as persistent topic except topic name must start with
Non-persistent topic supports all 3 different subscription-modes: Exclusive, Shared, Failover which are already explained in details at GettingStarted.
PulsarClient client = PulsarClient.create("pulsar://localhost:6650"); Consumer consumer = client.subscribe( "non-persistent://sample/standalone/ns1/my-topic", "my-subscribtion-name");
PulsarClient client = PulsarClient.create("pulsar://localhost:6650"); Producer producer = client.createProducer( "non-persistent://sample/standalone/ns1/my-topic");
Sometimes, there would be a need to configure few dedicated brokers in a cluster, to just serve non-persistent topics.
Broker configuration for enabling broker to own only configured type of topics
# It disables broker to load persistent topics enablePersistentTopics=false # It enables broker to load non-persistent topics enableNonPersistentTopics=true
At the highest level, a Pulsar instance is composed of one or more Pulsar clusters. Clusters within an instance can replicate data amongst themselves.
In a Pulsar cluster:
- One or more brokers handles and load balances incoming messages from producers, dispatches messages to consumers, communicates with global ZooKeeper to handle various coordination tasks, stores messages in BookKeeper instances (aka bookies), relies on a cluster-specific ZooKeeper cluster for certain tasks, and more.
- A BookKeeper cluster consisting of one more or more bookies handles persistent storage of messages.
- A ZooKeeper cluster specific to that cluster handles
The diagram below provides an illustration of a Pulsar cluster:
At the broader instance level, an instance-wide ZooKeeper cluster called global ZooKeeper handles coordination tasks involving multiple clusters, for example geo-replication.
The Pulsar message broker is a stateless component that’s primarily responsible for running two other components:
- An HTTP server that exposes a REST API for both administrative tasks and topic lookup for producers and consumers
- A dispatcher, which is an asynchronous TCP server over a custom binary protocol used for all data transfers
Messages are typically dispatched out of a managed ledger cache for the sake of performance, unless the backlog exceeds the cache size. If the backlog grows too large for the cache, the broker will start reading entries from BookKeeper.
Finally, to support geo-replication on global topics, the broker manages replicators that tail the entries published in the local region and republish them to the remote region using the Pulsar Java client library.
For a guide to managing Pulsar brokers, see the Clusters and brokers guide.
A Pulsar instance consists of one or more Pulsar clusters. Clusters, in turn, consist of:
- One or more Pulsar brokers
- A ZooKeeper quorum used for cluster-level configuration and coordination
- An ensemble of bookies used for persistent storage of messages
Clusters can replicate amongst themselves using geo-replication.
For a guide to managing Pulsar clusters, see the Clusters and brokers guide.
In any Pulsar instance, there is an instance-wide cluster called
global that you can use to mange non-cluster-specific namespaces and topics. The
global cluster is created for you automatically when you initialize metadata for the first cluster in your instance.
Global topic names have this basic structure (note the
Pulsar uses Apache Zookeeper for metadata storage, cluster configuration, and coordination. In a Pulsar instance:
- A global ZooKeeper quorum stores configuration for properties, namespaces, and other entities that need to be globally consistent.
- Each cluster has its own local ZooKeeper ensemble that stores cluster-specific configuration and coordination such as ownership metadata, broker load reports, BookKeeper ledger metadata, and more.
When creating a new cluster
Pulsar provides guaranteed message delivery for applications. If a message successfully reaches a Pulsar broker, it will be delivered to its intended target.
This guarantee requires that non-acknowledged messages are stored in a durable manner until they can be delivered to and acknowledged by consumers. This mode of messaging is commonly called persistent messaging. In Pulsar, N copies of all messages are stored and synced on disk, for example 4 copies across two servers with mirrored RAID volumes on each server.
- It enables Pulsar to utilize many independent logs, called ledgers. Multiple ledgers can be created for topics over time.
- It offers very efficient storage for sequential data that handles entry replication.
- It guarantees read consistency of ledgers in the presence of various system failures.
- It offers even distribution of I/O across bookies.
- It’s horizontally scalable in both capacity and throughput. Capacity can be immediately increased by adding more bookies to a cluster.
- Bookies are designed to handle thousands of ledgers with concurrent reads and writes. By using multiple disk devices—one for journal and another for general storage–bookies are able to isolate the effects of read operations from the latency of ongoing write operations.
In addition to message data, cursors are also persistently stored in BookKeeper. Cursors are subscription positions for consumers. BookKeeper enables Pulsar to store consumer position in a scalable fashion.
At the moment, Pulsar only supports persistent message storage. This accounts for the
persistent in all topic names. Here’s an example:
In the future, Pulsar will support ephemeral message storage.
A ledger is an append-only data structure with a single writer that is assigned to multiple BookKeeper storage nodes, or bookies. Ledger entries are replicated to multiple bookies. Ledgers themselves have very simple semantics:
- A Pulsar broker can create a ledger, append entries to the ledger, and close the ledger.
- After the ledger has been closed—either explicitly or because the writer process crashed—it can then be opened only in read-only mode.
- Finally, when entries in the ledger are no longer needed, the whole ledger can be deleted from the system (across all bookies).
Ledger read consistency
The main strength of Bookkeeper is that it guarantees read consistency in ledgers in the presence of failures. Since the ledger can only be written to by a single process, that process is free to append entries very efficiently, without need to obtain consensus. After a failure, the ledger will go through a recovery process that will finalize the state of the ledger and establish which entry was last committed to the log. After that point, all readers of the ledger are guaranteed to see the exact same content.
Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called the managed ledger that represents the storage layer for a single topic. A managed ledger represents the abstraction of a stream of messages with a single writer that keeps appending at the end of the stream and multiple cursors that are consuming the stream, each with its own associated position.
Internally, a single managed ledger uses multiple BookKeeper ledgers to store the data. There are two reasons to have multiple ledgers:
- After a failure, a ledger is no longer writable and a new one needs to be created.
- A ledger can be deleted when all cursors have consumed the messages it contains. This allows for periodic rollover of ledgers.
In BookKeeper, journal files contain BookKeeper transaction logs. Before making an update to a ledger, a bookie needs to ensure that a transaction describing the update is written to persistent (non-volatile) storage. A new journal file is created once the bookie starts or the older journal file reaches the journal file size threshold (configured using the
A future version of BookKeeper will support non-persistent messaging and thus multiple durability modes at the topic level. This will enable you to set the durability mode at the topic level, replacing the
persistent in topic names with a
Pulsar enables messages to be produced and consumed in different geo-locations. For instance, your application may be publishing data in one region or market and you would like to process it for consumption in other regions or markets. Geo-replication in Pulsar enables you to do that.
Pulsar was created from the ground up as a multi-tenant system. To support multi-tenancy, Pulsar has a concept of properties. Properties can be spread across clusters and can each have their own authentication and authorization scheme applied to them. They are also the administrative unit at which storage quotas, message TTL, and isolation policies can be managed.
The multi-tenant nature of Pulsar is reflected mostly visibly in topic URLs, which have this structure:
As you can see, the property is the most basic unit of categorization for topics (and even more fundamental than the cluster).
Properties and namespaces
Pulsar was designed from the ground up to be a multi-tenant system. In Pulsar, tenants are identified by properties. Properties are the highest administrative unit within a Pulsar instance. Within properties
To each property in a Pulsar instance you can assign:
- An authorization scheme
- The set of clusters to which the property applies
Properties and namespaces are two key concepts of Pulsar to support multi-tenancy.
- A property identifies a tenant. Pulsar is provisioned for a specified property with appropriate capacity allocated to the property.
- A namespace is the administrative unit nomenclature within a property. The configuration policies set on a namespace apply to all the topics created in such namespace. A property may create multiple namespaces via self-administration using REST API and CLI tools. For instance, a property with different applications can create a separate namespace for each application.
Names for topics in the same namespace will look like this:
Authentication and Authorization
Pulsar supports a pluggable authentication mechanism which can be configured at broker and it also supports authorization to identify client and its access rights on topics and properties.
Pulsar exposes a client API with language bindings for Java and C++. The client API optimizes and encapsulates Pulsar’s client-broker communication protocol and exposes a simple and intuitive API for use by applications.
Under the hood, the current official Pulsar client libraries support transparent reconnection and/or connection failover to brokers, queuing of messages until acknowledged by the broker, and heuristics such as connection retries with backoff.
Custom client libraries
If you’d like to create your own client library, we recommend consulting the documentation on Pulsar’s custom binary protocol.
Client setup phase
When an application wants to create a producer/consumer, the Pulsar client library will initiate a setup phase that is composed of two steps:
- The client will attempt to determine the owner of the topic by sending an HTTP lookup request to the broker. The request could reach one of the active brokers which, by looking at the (cached) zookeeper metadata will know who is serving the topic or, in case nobody is serving it, will try to assign it to the least loaded broker.
- Once the client library has the broker address, it will create a TCP connection (or reuse an existing connection from the pool) and authenticate it. Within this connection, client and broker exchange binary commands from a custom protocol. At this point the client will send a command to create producer/consumer to the broker, which will comply after having validated the authorization policy.
Whenever the TCP connection breaks, the client will immediately re-initiate this setup phase and will keep trying with exponential backoff to re-establish the producer or consumer until the operation succeeds.
Clients connecting to Pulsar brokers need to be able to communicate with an entire Pulsar instance using a single URL. Pulsar provides a built-in service discovery mechanism that you can set up using the instructions in the Deploying a Pulsar instance guide.
You can use your own service discovery system if you’d like. If you use your own system, there is just one requirement: when a client performs an HTTP request to an endpoint, such as
http://pulsar.us-west.example.com:8080, the client needs to be redirected to some active broker in the desired cluster, whether via DNS, an HTTP or IP redirect, or some other means.