Kafka sink connector
The Kafka sink connector pulls messages from Pulsar topics and persists the messages to Kafka topics.
This guide explains how to configure and use the Kafka sink connector.
Configuration
The configuration of the Kafka sink connector has the following parameters.
Property
Name | Type | Required | Default | Description |
---|---|---|---|---|
bootstrapServers | String | true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
acks | String | true | " " (empty string) | The number of acknowledgments that the producer requires the leader to receive before a request completes. This controls the durability of the sent records. |
batchsize | long | false | 16384L | The batch size that a Kafka producer attempts to batch records together before sending them to brokers. |
maxRequestSize | long | false | 1048576L | The maximum size of a Kafka request in bytes. |
topic | String | true | " " (empty string) | The Kafka topic which receives messages from Pulsar. |
keyDeserializationClass | String | false | org.apache.kafka.common.serialization.StringSerializer | The serializer class for Kafka producers to serialize keys. |
valueDeserializationClass | String | false | org.apache.kafka.common.serialization.ByteArraySerializer | The serializer class for Kafka producers to serialize values. The serializer is set by a specific implementation of KafkaAbstractSink . |
producerConfigProperties | Map | false | " " (empty string) | The producer configuration properties to be passed to producers. Note: other properties specified in the connector configuration file take precedence over this configuration. |
Example
Before using the Kafka sink connector, you need to create a configuration file through one of the following methods.
-
JSON
{
"bootstrapServers": "localhost:6667",
"topic": "test",
"acks": "1",
"batchSize": "16384",
"maxRequestSize": "1048576",
"producerConfigProperties":
{
"client.id": "test-pulsar-producer",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "GSSAPI",
"sasl.kerberos.service.name": "kafka",
"acks": "all"
}
} -
YAML
configs:
bootstrapServers: "localhost:6667"
topic: "test"
acks: "1"
batchSize: "16384"
maxRequestSize: "1048576"
producerConfigProperties:
client.id: "test-pulsar-producer"
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "GSSAPI"
sasl.kerberos.service.name: "kafka"
acks: "all"