Skip to main content

Kafka Connector

Source​

The Kafka Source Connector is used to pull messages from Kafka topics and persist the messages to a Pulsar topic.

Source Configuration Options​

NameRequiredDefaultDescription
bootstrapServerstruenullA list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
groupIdtruenullA unique string that identifies the consumer group this consumer belongs to.
fetchMinBytesfalsenullMinimum bytes expected for each fetch response.
autoCommitEnabledfalsefalseIf true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin.
autoCommitIntervalMsfalsenullThe frequency in ms that the consumer offsets are committed to zookeeper.
sessionTimeoutMsfalsenullThe timeout used to detect consumer failures when using Kafka's group management facility.
topictruenullTopic name to receive records from Kafka
keySerializerClassfalseorg.apache.kafka.common.serialization.StringSerializerSerializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
valueSerializerClassfalseorg.apache.kafka.common.serialization.StringSerializerSerializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.

Sink​

The Kafka Sink Connector is used to pull messages from Pulsar topics and persist the messages to a Kafka topic.

Sink Configuration Options​

NameRequiredDefaultDescription
ackstruenullThe kafka producer acks mode
batchSizetruenullThe kafka producer batch size.
maxRequestSizetruenullThe maximum size of a request in bytes.
topictruenullTopic name to receive records from Kafka
keySerializerClassfalseorg.apache.kafka.common.serialization.StringSerializerSerializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
valueSerializerClassfalseorg.apache.kafka.common.serialization.StringSerializerSerializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.