Skip to main content
Version: 2.10.1

Pulsar adaptor for Apache Kafka

Pulsar provides an easy option for applications that are currently written using the Apache Kafka Java client API.

Using the Pulsar Kafka compatibility wrapper​

In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in pom.xml:


Then include this dependency for the Pulsar Kafka wrapper:


With the new dependency, the existing code works without any changes. You need to adjust the configuration, and make sure it points the producers and consumers to Pulsar service rather than Kafka, and uses a particular Pulsar topic.

Using the Pulsar Kafka compatibility wrapper together with existing kafka client​

When migrating from Kafka to Pulsar, the application might use the original kafka client and the pulsar kafka wrapper together during migration. You should consider using the unshaded pulsar kafka client wrapper.


When using this dependency, construct producers using org.apache.kafka.clients.producer.PulsarKafkaProducer instead of org.apache.kafka.clients.producer.KafkaProducer and org.apache.kafka.clients.producer.PulsarKafkaConsumer for consumers.

Producer example​

// Topic needs to be a regular Pulsar topic
String topic = "persistent://public/default/my-topic";

Properties props = new Properties();
// Point to a Pulsar service
props.put("bootstrap.servers", "pulsar://localhost:6650");

props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());

Producer<Integer, String> producer = new KafkaProducer(props);

for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));"Message {} sent successfully", i);


Consumer example​

String topic = "persistent://public/default/my-topic";

Properties props = new Properties();
// Point to a Pulsar service
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("", "my-subscription-name");
props.put("", "false");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());

Consumer<Integer, String> consumer = new KafkaConsumer(props);

while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
records.forEach(record -> {"Received record: {}", record);

// Commit last offset

Complete Examples​

You can find the complete producer and consumer examples here.

Compatibility matrix​

Currently the Pulsar Kafka wrapper supports most of the operations offered by the Kafka API.



Producer MethodSupportedNotes
Future<RecordMetadata> send(ProducerRecord<K, V> record)Yes
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)Yes
void flush()Yes
List<PartitionInfo> partitionsFor(String topic)No
Map<MetricName, ? extends Metric> metrics()No
void close()Yes
void close(long timeout, TimeUnit unit)Yes


Config propertySupportedNotes
acksIgnoredDurability and quorum writes are configured at the namespace level
auto.offset.resetYesIt uses a default value of earliest if you do not give a specific setting.
compression.typeYesAllows gzip and lz4. No snappy.
connections.max.idle.msYesOnly support up to 2,147,483,647,000(Integer.MAX_VALUE * 1000) ms of idle time
linger.msYesControls the group commit time when batching messages
max.block.msIgnored Pulsar ordering is maintained even with multiple requests in flight
retriesIgnoredPulsar client retries with exponential backoff until the send timeout expires.


The following table lists consumer APIs.

Consumer MethodSupportedNotes
Set<TopicPartition> assignment()No
Set<String> subscription()Yes
void subscribe(Collection<String> topics)Yes
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)No
void assign(Collection<TopicPartition> partitions)No
void subscribe(Pattern pattern, ConsumerRebalanceListener callback)No
void unsubscribe()Yes
ConsumerRecords<K, V> poll(long timeoutMillis)Yes
void commitSync()Yes
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)Yes
void commitAsync()Yes
void commitAsync(OffsetCommitCallback callback)Yes
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)Yes
void seek(TopicPartition partition, long offset)Yes
void seekToBeginning(Collection<TopicPartition> partitions)Yes
void seekToEnd(Collection<TopicPartition> partitions)Yes
long position(TopicPartition partition)Yes
OffsetAndMetadata committed(TopicPartition partition)Yes
Map<MetricName, ? extends Metric> metrics()No
List<PartitionInfo> partitionsFor(String topic)No
Map<String, List<PartitionInfo>> listTopics()No
Set<TopicPartition> paused()No
void pause(Collection<TopicPartition> partitions)No
void resume(Collection<TopicPartition> partitions)No
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)No
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)No
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)No
void close()Yes
void close(long timeout, TimeUnit unit)Yes
void wakeup()No


Config propertySupportedNotes
group.idYesMaps to a Pulsar subscription name
max.poll.interval.msIgnoredMessages are "pushed" from broker
bootstrap.serversYesNeeds to point to a single Pulsar service URL
auto.commit.interval.msIgnoredWith auto-commit, acks are sent immediately to broker
auto.offset.resetYesOnly support earliest and latest.

Customize Pulsar configurations​

You can configure Pulsar authentication provider directly from the Kafka properties.

Pulsar client properties​

Config propertyDefaultNotes
[pulsar.authentication.class] to auth provider. For example, org.apache.pulsar.client.impl.auth.AuthenticationTls.
[] which represents parameters for the Authentication-Plugin.
[pulsar.authentication.params.string] which represents parameters for the Authentication-Plugin, for example, key1:val1,key2:val2.
[pulsar.use.tls] TLS transport encryption.
[] for the TLS trust certificate store.
[pulsar.tls.allow.insecure.connection] self-signed certificates from brokers.
[] operations timeout.
[pulsar.stats.interval.seconds] client lib stats printing interval.
[] number of Netty IO threads to use.
[] maximum number of connection to each broker.
[pulsar.use.tcp.nodelay] no-delay.
[pulsar.concurrent.lookup.requests] maximum number of concurrent topic lookups.
[pulsar.max.number.rejected.request.per.connection] threshold of errors to forcefully close a connection.
[] alive interval for each client-broker-connection.

Pulsar producer properties​

Config propertyDefaultNotes
[] the producer name.
[] baseline for sequence ID of this producer.
[pulsar.producer.max.pending.messages] the maximum size of the message queue pending to receive an acknowledgment from the broker.
[pulsar.producer.max.pending.messages.across.partitions] the maximum number of pending messages across all the partitions.
[pulsar.producer.batching.enabled] whether automatic batching of messages is enabled for the producer.
[pulsar.producer.batching.max.messages] maximum number of messages in a batch.
[pulsar.block.if.producer.queue.full] the block producer if queue is full.
[] the CryptoReader-Factory(CryptoKeyReaderFactory) classname which allows producer to create CryptoKeyReader.

Pulsar consumer Properties​

Config propertyDefaultNotes
[] the consumer name.
[pulsar.consumer.receiver.queue.size] the size of the consumer receiver queue.
[] the maximum amount of group time for consumers to send the acknowledgments to the broker.
[] the maximum size of the total receiver queue across partitions.
[pulsar.consumer.subscription.topics.mode] the subscription topic mode for consumers.
[] the CryptoReader-Factory(CryptoKeyReaderFactory) classname which allows consumer to create CryptoKeyReader.