The Pulsar C++ client


We welcome contributions from the open source community, kindly make sure your changes are backward compatible with gcc-4.4.7 and Boost 1.41.

Supported platforms

The Pulsar C++ client has been successfully tested on MacOS and Linux.

System requirements

You need to have the following installed to use the C++ client:

Compilation

There are separate compilation instructions for MacOS and Linux. For both systems, start by cloning the Pulsar repository:

$ git clone https://github.com/apache/incubator-pulsar/tree/master

Linux

First, install all of the necessary dependencies:

$ apt-get install cmake libssl-dev libcurl4-openssl-dev liblog4cxx-dev \
  libprotobuf-dev libboost-all-dev libgtest-dev libjsoncpp-dev

Then compile and install Google Test:

$ git clone https://github.com/google/googletest.git && cd googletest
$ sudo cmake .
$ sudo make
$ sudo cp *.a /usr/lib

Finally, compile the Pulsar client library for C++ inside the Pulsar repo:

$ cd pulsar-client-cpp
$ cmake .
$ make

The resulting files, libpulsar.so and libpulsar.a, will be placed in the lib folder of the repo while two tools, perfProducer and perfConsumer, will be placed in the perf directory.

MacOS

First, install all of the necessary dependencies:

# OpenSSL installation
$ brew install openssl
$ export OPENSSL_INCLUDE_DIR=/usr/local/opt/openssl/include/
$ export OPENSSL_ROOT_DIR=/usr/local/opt/openssl/

# Protocol Buffers installation
$ brew tap homebrew/versions
$ brew install protobuf260
$ brew install boost
$ brew install log4cxx

# Google Test installation
$ git clone https://github.com/google/googletest.git
$ cd googletest
$ cmake .
$ make install

Then compile the Pulsar client library in the repo that you cloned:

$ cd pulsar-client-cpp
$ cmake .
$ make

Connection URLs

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

Pulsar protocol URLs are assigned to specific clusters, use the pulsar scheme and have a default port of 6650. Here’s an example for localhost:

pulsar://localhost:6650

A URL for a production Pulsar cluster may look something like this:

pulsar://pulsar.us-west.example.com:6650

If you’re using TLS authentication, the URL will look like something like this:

pulsar+ssl://pulsar.us-west.example.com:6651

Global vs. cluster-specific topics

Pulsar topics can be either cluster specific or global. Cluster-specific topic URLs have this structure:

persistent://property/cluster/namespace/topic

If the topic that a client needs to publish to or consume from is specific to a cluster, the client will need to use the broker service URL that you assigned to that cluster when initializing its metadata.

If the topic is global, however, the URL for the topic will look like this:

persistent://property/global/namespace/topic

In that case, your client can use the broker service URL for any cluster in the instance and Pulsar’s internal service discovery system will handle the rest.

Consumer

Client client("pulsar://localhost:6650");

Consumer consumer;
Result result = client.subscribe("persistent://sample/standalone/ns1/my-topic", "my-subscribtion-name", consumer);
if (result != ResultOk) {
    LOG_ERROR("Failed to subscribe: " << result);
    return -1;
}

Message msg;

while (true) {
    consumer.receive(msg);
    LOG_INFO("Received: " << msg << "  with payload '" << msg.getDataAsString() << "'");

    consumer.acknowledge(msg);
}

client.close();

Producer

Client client("pulsar://localhost:6650");

Producer producer;
Result result = client.createProducer("persistent://sample/standalone/ns1/my-topic", producer);
if (result != ResultOk) {
    LOG_ERROR("Error creating producer: " << result);
    return -1;
}

// Publish 10 messages to the topic
for(int i=0;i<10;i++){
    Message msg = MessageBuilder().setContent("my-message").build();
    Result res = producer.send(msg);
    LOG_INFO("Message sent: " << res);
}
client.close();

Authentication

ClientConfiguration config = ClientConfiguration();
config.setUseTls(true);
std::string certfile = "/path/to/cacert.pem";

ParamMap params;
params["tlsCertFile"] = "/path/to/client-cert.pem";
params["tlsKeyFile"]  = "/path/to/client-key.pem";
config.setTlsTrustCertsFilePath(certfile);
config.setTlsAllowInsecureConnection(false);
AuthenticationPtr auth = pulsar::AuthFactory::create("/path/to/libauthtls.so", params);
config.setAuth(auth);

Client client("pulsar+ssl://my-broker.com:6651",config);