The Pulsar Java client
The Pulsar Java client can be used both to create Java producers and consumers of messages but also to perform administrative tasks.
The current version of the Java client is 1.20.0-incubating.
Javadoc for the Pulsar client is divided up into two domains, by package:
Package | Description |
---|---|
org.apache.pulsar.client.api |
The producer and consumer API |
org.apache.pulsar.client.admin |
The Java admin API |
This document will focus only on the client API for producing and consuming messages on Pulsar topics. For a guide to using the Java admin client, see The Pulsar admin interface.
Installation
The latest version of the Pulsar Java client library is available via Maven Central. To use the latest version, add the pulsar-client
library to your build configuration.
Maven
If you’re using Maven, add this to your pom.xml
:
<!-- in your <properties> block -->
<pulsar.version>1.20.0-incubating</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
Gradle
If you’re using Gradle, add this to your build.gradle
file:
def pulsarVersion = '1.20.0-incubating'
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
}
Connection URLs
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
Pulsar protocol URLs are assigned to specific clusters, use the pulsar
scheme and have a default port of 6650. Here’s an example for localhost
:
pulsar://localhost:6650
A URL for a production Pulsar cluster may look something like this:
pulsar://pulsar.us-west.example.com:6650
If you’re using TLS authentication, the URL will look like something like this:
pulsar+ssl://pulsar.us-west.example.com:6651
Global vs. cluster-specific topics
Pulsar topics can be either cluster specific or global. Cluster-specific topic URLs have this structure:
If the topic that a client needs to publish to or consume from is specific to a cluster, the client will need to use the broker service URL that you assigned to that cluster when initializing its metadata.
If the topic is global, however, the URL for the topic will look like this:
In that case, your client can use the broker service URL for any cluster in the instance and Pulsar’s internal service discovery system will handle the rest.
Client configuration
You can instantiate a PulsarClient
object using just a URL for the target Pulsar cluster, like this:
String pulsarBrokerRootUrl = "pulsar://localhost:6650";
PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
This PulsarClient
object will use the default configuration. See the Javadoc for ClientConfiguration
to see how to provide a non-default configuration.
Producers
In Pulsar, producers write messages to topics. You can instantiate a new producer by first instantiating a PulsarClient
, passing it a URL for a Pulsar broker.
String pulsarBrokerRootUrl = "pulsar://localhost:6650";
PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
Default broker URLs for standalone clusters
If you’re running a cluster in standalone mode, the broker will be available at the pulsar://localhost:6650
URL by default.
Once you’ve instantiated a PulsarClient
object, you can create a Producer
for a topic.
String topic = "persistent://sample/standalone/ns1/my-topic";
Producer producer = client.createProducer(topic);
You can then send messages to the broker and topic you specified:
// Publish 10 messages to the topic
for (int i = 0; i < 10; i++) {
producer.send("my-message".getBytes());
}
You should always make sure to close your producers, consumers, and clients when they are no longer needed:
producer.close();
consumer.close();
client.close();
Closer operations can also be asynchronous:
producer.asyncClose();
consumer.asyncClose();
clioent.asyncClose();
Configuring producers
If you instantiate a Producer
object specifying only a topic name, as in the example above, the producer will use the default configuration. To use a non-default configuration, you can instantiate the Producer
with a ProducerConfiguration
object as well. Here’s an example configuration:
PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
ProducerConfiguration config = new ProducerConfiguration();
config.setBatchingEnabled(true);
config.setSendTimeout(10, TimeUnit.SECONDS);
Producer producer = client.createProducer(topic, config);
Message routing
When using partitioned topics, you can specify the routing mode whenever you publish messages using a producer. For more on specifying a routing mode using the Java client, see the Partitioned Topics guide.
Async send
You can publish messages asynchronously using the Java client. With async send, the producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full (max size configurable), the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer.
Here’s an example async send operation:
CompletableFuture<MessageId> future = producer.sendAsync("my-async-message".getBytes());
Async send operations return a MessageId
wrapped in a CompletableFuture
.
Consumers
In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new consumer by first instantiating a PulsarClient
, passing it a URL for a Pulsar broker (we’ll use the client
object from the producer example above).
Once you’ve instantiated a PulsarClient
object, you can create a Consumer
for a topic. You also need to supply a subscription name.
String topic = "persistent://sample/standalone/ns1/my-topic"; // from above
String subscription = "my-subscription";
Consumer consumer = client.subscribe(topic, subscription);
You can then use the receive
method to listen for messages on the topic. This while
loop sets up a long-running listener for the persistent://sample/standalone/ns1/my-topic
topic, prints the contents of any message that’s received, and then acknowledges that the message has been processed:
while (true) {
// Wait for a message
Message msg = consumer.receive();
System.out.println("Received message: " + msg.getData());
// Acknowledge the message so that it can be deleted by broker
consumer.acknowledge(msg);
}
Configuring consumers
If you instantiate a Consumer
object specifying only a topic and subscription name, as in the example above, the consumer will use the default configuration. To use a non-default configuration, you can instantiate the Consumer
with a ConsumerConfiguration
object as well.
Here’s an example configuration:
PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
ConsumerConfiguration config = new ConsumerConfiguration();
config.setSubscriptionType(SubscriptionType.Shared);
config.setReceiverQueueSize(10);
Consumer consumer = client.createConsumer(topic, config);
Async receive
The receive
method will receive messages synchronously (the consumer process will be blocked until a message is available). You can also use async receive, which will return immediately with a CompletableFuture
object that completes once a new message is available.
Here’s an example:
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
Async receive operations return a Message
wrapped in a CompletableFuture
.
Authentication
Pulsar currently supports two authentication schemes: TLS and Athenz. The Pulsar Java client can be used with both.
TLS Authentication
To use TLS, you need to set TLS to true
using the setUseTls
method, point your Pulsar client to a TLS cert path, and provide paths to cert and key files.
Here’s an example configuration:
ClientConfiguration conf = new ClientConfiguration();
conf.setUseTls(true);
conf.setTlsTrustCertsFilePath("/path/to/cacert.pem");
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", "/path/to/client-cert.pem");
authParams.put("tlsKeyFile", "/path/to/client-key.pem");
conf.setAuthentication(AuthenticationTls.class.getName(), authParams);
PulsarClient client = PulsarClient.create(
"pulsar+ssl://my-broker.com:6651", conf);
Athenz
To use Athenz as an authentication provider, you need to use TLS and provide values for four parameters in a hash:
tenantDomain
tenantService
providerDomain
privateKey
You can also set an optional keyId
. Here’s an example configuration:
ClientConfiguration conf = new ClientConfiguration();
// Enable TLS
conf.setUseTls(true);
conf.setTlsTrustCertsFilePath("/path/to/cacert.pem");
// Set Athenz auth plugin and its parameters
Map<String, String> authParams = new HashMap<>();
authParams.put("tenantDomain", "shopping"); // Tenant domain name
authParams.put("tenantService", "some_app"); // Tenant service name
authParams.put("providerDomain", "pulsar"); // Provider domain name
authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")
conf.setAuthentication(AuthenticationAthenz.class.getName(), authParams);
PulsarClient client = PulsarClient.create(
"pulsar+ssl://my-broker.com:6651", conf);
Note: privateKey
parameter supports following three patterns format.
file:///path/to/file
file:/path/to/file
data:application/x-pem-file;base64,<base64-encoded value>