Debezium source connector
You can download all the Pulsar connectors on download page.
The Debezium source connector pulls messages from MySQL or PostgreSQL and persists the messages to Pulsar topics.
Configuration
The configuration of the Debezium source connector has the following properties.
Name | Required | Default | Description |
---|---|---|---|
task.class | true | null | A source task class that implemented in Debezium. |
database.hostname | true | null | The address of a database server. |
database.port | true | null | The port number of a database server. |
database.user | true | null | The name of a database user that has the required privileges. |
database.password | true | null | The password for a database user that has the required privileges. |
database.server.id | true | null | The connector's identifier that must be unique within a database cluster and similar to the database's server-id configuration property. |
database.server.name | true | null | The logical name of a database server/cluster, which forms a namespace and it is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. |
database.whitelist | false | null | A list of all databases hosted by this server which is monitored by the connector. This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring. |
key.converter | true | null | The converter provided by Kafka Connect to convert record key. |
value.converter | true | null | The converter provided by Kafka Connect to convert record value. |
database.history | true | null | The name of the database history class. |
database.history.pulsar.topic | true | null | The name of the database history topic where the connector writes and recovers DDL statements. Note: this topic is for internal use only and should not be used by consumers. |
database.history.pulsar.service.url | false | null | Pulsar cluster service URL for history topic. Note: If database.history.pulsar.service.url is not set, then the database history Pulsar client will use the same client settings as those of the source connector, such as client_auth_plugin and client_auth_params . |
offset.storage.topic | true | null | Record the last committed offsets that the connector successfully completes. |
json-with-envelope | false | false | Present the message that only consists of payload. |
database.history.pulsar.reader.config | false | null | The configs of the reader for the database schema history topic, in the form of a JSON string with key-value pairs. |
offset.storage.reader.config | false | null | The configs of the reader for the kafka connector offsets topic, in the form of a JSON string with key-value pairs. |
Converter Options
- org.apache.kafka.connect.json.JsonConverter
This config json-with-envelope
is valid only for the JsonConverter. The default value is false, and the consumer uses the schema Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)
, and the message only consist of payload.
If the config json-with-envelope
value is true, the consumer uses the schema Schema.KeyValue(Schema.BYTES, Schema.BYTES
, the message consists of schema and payload.
- org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter
If users select the AvroConverter, then the pulsar consumer should use the schema Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)
, and the message consist of payload.
MongoDB Configuration
Name | Required | Default | Description |
---|---|---|---|
mongodb.hosts | true | null | The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017). |
mongodb.name | true | null | A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. |
mongodb.user | true | null | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
mongodb.password | true | null | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
mongodb.task.id | true | null | The taskId of the MongoDB connector that attempts to use a separate task for each replica set. |
Customize the Reader config for the metadata topics
The Debezium Connector exposes database.history.pulsar.reader.config
and offset.storage.reader.config
to configure the reader of database schema history topic and the Kafka connector offsets topic. For example, it can be used to configure the subscription name and other reader configurations. You can find the available configurations at ReaderConfigurationData.
For example, to configure the subscription name for both Readers, you can add the following configuration:
-
JSON
{
"configs": {
"database.history.pulsar.reader.config": "{\"subscriptionName\":\"history-reader\"}",
"offset.storage.reader.config": "{\"subscriptionName\":\"offset-reader\"}",
}
} -
YAML
configs:
database.history.pulsar.reader.config: "{\"subscriptionName\":\"history-reader\"}"
offset.storage.reader.config: "{\"subscriptionName\":\"offset-reader\"}"
Example of MySQL
You need to create a configuration file before using the Pulsar Debezium connector.
Configuration
You can use one of the following methods to create a configuration file.
-
JSON
{
"configs": {
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
"database.history.pulsar.topic": "history-topic",
"database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"offset.storage.topic": "offset-topic"
}
} -
YAML
You can create a
debezium-mysql-source-config.yaml
file and copy the contents below to thedebezium-mysql-source-config.yaml
file.tenant: "public"
namespace: "default"
name: "debezium-mysql-source"
topicName: "debezium-mysql-topic"
archive: "connectors/pulsar-io-debezium-mysql-4.0.1.nar"
parallelism: 1
configs:
## config for mysql, docker image: debezium/example-mysql:0.8
database.hostname: "localhost"
database.port: "3306"
database.user: "debezium"
database.password: "dbz"
database.server.id: "184054"
database.server.name: "dbserver1"
database.whitelist: "inventory"
database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
database.history.pulsar.topic: "history-topic"
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
key.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
## OFFSET_STORAGE_TOPIC_CONFIG
offset.storage.topic: "offset-topic"