Skip to main content

Kafka sink connector

The Kafka sink connector pulls messages from Pulsar topics and persists the messages to Kafka topics.

This guide explains how to configure and use the Kafka sink connector.

Configuration

The configuration of the Kafka sink connector has the following parameters.

Property

NameTypeRequiredDefaultDescription
bootstrapServersStringtrue" " (empty string)A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster.
acksStringtrue" " (empty string)The number of acknowledgments that the producer requires the leader to receive before a request completes.
This controls the durability of the sent records.
batchsizelongfalse16384LThe batch size that a Kafka producer attempts to batch records together before sending them to brokers.
maxRequestSizelongfalse1048576LThe maximum size of a Kafka request in bytes.
topicStringtrue" " (empty string)The Kafka topic which receives messages from Pulsar.
keyDeserializationClassStringfalseorg.apache.kafka.common.serialization.StringSerializerThe serializer class for Kafka producers to serialize keys.
valueDeserializationClassStringfalseorg.apache.kafka.common.serialization.ByteArraySerializerThe serializer class for Kafka producers to serialize values.

The serializer is set by a specific implementation of KafkaAbstractSink.
producerConfigPropertiesMapfalse" " (empty string)The producer configuration properties to be passed to producers.

Note: other properties specified in the connector configuration file take precedence over this configuration.

Example

Before using the Kafka sink connector, you need to create a configuration file through one of the following methods.

  • JSON


    {
    "bootstrapServers": "localhost:6667",
    "topic": "test",
    "acks": "1",
    "batchSize": "16384",
    "maxRequestSize": "1048576",
    "producerConfigProperties":
    {
    "client.id": "test-pulsar-producer",
    "security.protocol": "SASL_PLAINTEXT",
    "sasl.mechanism": "GSSAPI",
    "sasl.kerberos.service.name": "kafka",
    "acks": "all"
    }
    }

  • YAML

yaml configs: bootstrapServers: "localhost:6667" topic: "test" acks: "1" batchSize: "16384" maxRequestSize: "1048576" producerConfigProperties: client.id: "test-pulsar-producer" security.protocol: "SASL_PLAINTEXT" sasl.mechanism: "GSSAPI" sasl.kerberos.service.name: "kafka" acks: "all"