Skip to main content

Pulsar C++ client

You can use a Pulsar C++ client to create producers, consumers, and readers.

All the methods in producer, consumer, and reader of a C++ client are thread-safe. You can read the API docs for the C++ client.

Installation​

Use one of the following methods to install a Pulsar C++ client.

Brew​

Use Homebrew to install the latest tagged version with the library and headers:

brew install libpulsar

Deb​

  1. Download any one of the Deb packages:
wget https://archive.apache.org/dist/pulsar/pulsar-2.7.5/DEB/apache-pulsar-client.deb

This package contains shared libraries libpulsar.so and libpulsarnossl.so.

  1. Install the package using the following command:
apt install ./apache-pulsar-client*.deb

Now, you can see Pulsar C++ client libraries installed under the /usr/lib directory.

RPM​

  1. Download any one of the RPM packages:
wget https://archive.apache.org/dist/pulsar/pulsar-2.7.5/RPMS/apache-pulsar-client-2.7.5-1.x86_64.rpm

This package contains shared libraries: libpulsar.so and libpulsarnossl.so.

  1. Install the package using the following command:
rpm -ivh apache-pulsar-client*.rpm

Now, you can see Pulsar C++ client libraries installed under the /usr/lib directory.

note

If you get an error like "libpulsar.so: cannot open shared object file: No such file or directory" when starting a Pulsar client, you need to run ldconfig first.

Source​

For how to build Pulsar C++ client on different platforms from source code, see compliation.

Connection URLs​

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

Pulsar protocol URLs are assigned to specific clusters, you can use the Pulsar URI scheme. The default port is 6650. The following is an example for localhost.


pulsar://localhost:6650

In a Pulsar cluster in production, the URL looks as follows:


pulsar://pulsar.us-west.example.com:6650

If you use TLS authentication, you need to add ssl, and the default port is 6651. The following is an example.


pulsar+ssl://pulsar.us-west.example.com:6651

Create a consumer​

To connect to Pulsar as a consumer, you need to create a consumer on the C++ client. The following is an example.


Client client("pulsar://localhost:6650");

Consumer consumer;
Result result = client.subscribe("my-topic", "my-subscription-name", consumer);
if (result != ResultOk) {
LOG_ERROR("Failed to subscribe: " << result);
return -1;
}

Message msg;

while (true) {
consumer.receive(msg);
LOG_INFO("Received: " << msg
<< " with payload '" << msg.getDataAsString() << "'");

consumer.acknowledge(msg);
}

client.close();

Create a producer​

To connect to Pulsar as a producer, you need to create a producer on the C++ client. The following is an example.


Client client("pulsar://localhost:6650");

Producer producer;
Result result = client.createProducer("my-topic", producer);
if (result != ResultOk) {
LOG_ERROR("Error creating producer: " << result);
return -1;
}

// Publish 10 messages to the topic
for (int i = 0; i < 10; i++){
Message msg = MessageBuilder().setContent("my-message").build();
Result res = producer.send(msg);
LOG_INFO("Message sent: " << res);
}
client.close();

Enable authentication in connection URLs​

If you use TLS authentication when connecting to Pulsar, you need to add ssl in the connection URLs, and the default port is 6651. The following is an example.


ClientConfiguration config = ClientConfiguration();
config.setUseTls(true);
config.setTlsTrustCertsFilePath("/path/to/cacert.pem");
config.setTlsAllowInsecureConnection(false);
config.setAuth(pulsar::AuthTls::create(
"/path/to/client-cert.pem", "/path/to/client-key.pem"););

Client client("pulsar+ssl://my-broker.com:6651", config);

For complete examples, refer to C++ client examples.

Schema​

This section describes some examples about schema. For more information about schema, see Pulsar schema.

Create producer with Avro schema​

The following example shows how to create a producer with an Avro schema.


static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
Producer producer;
ProducerConfiguration producerConf;
producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
client.createProducer("topic-avro", producerConf, producer);

Create consumer with Avro schema​

The following example shows how to create a consumer with an Avro schema.


static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
ConsumerConfiguration consumerConf;
Consumer consumer;
consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
client.subscribe("topic-avro", "sub-2", consumerConf, consumer)