Canal source connector
You can download all the Pulsar connectors on download page.
The Canal source connector pulls messages from MySQL to Pulsar topics.
Configuration
The configuration of the Canal source connector has the following properties.
Property
| Name | Required | Default | Description | 
|---|---|---|---|
| username | true | None | Canal server account (not MySQL). | 
| password | true | None | Canal server password (not MySQL). | 
| destination | true | None | Source destination that Canal source connector connects to. | 
| singleHostname | false | None | Canal server address. | 
| singlePort | false | None | Canal server port. | 
| cluster | true | false | Whether to enable cluster mode based on Canal server configuration or not. If set to true, it talks to zkServersto figure out the actual database host.If set to false, it connects to the database specified by singleHostnameandsinglePort. | 
| zkServers | true | None | Address and port of the Zookeeper that Canal source connector talks to figure out the actual database host. | 
| batchSize | false | 1000 | Batch size to fetch from Canal. | 
Example
Before using the Canal connector, you can create a configuration file through one of the following methods.
- 
JSON {
 "zkServers": "127.0.0.1:2181",
 "batchSize": "5120",
 "destination": "example",
 "username": "",
 "password": "",
 "cluster": false,
 "singleHostname": "127.0.0.1",
 "singlePort": "11111",
 }
- 
YAML You can create a YAML file and copy the contents below to your YAML file. configs:
 zkServers: "127.0.0.1:2181"
 batchSize: 5120
 destination: "example"
 username: ""
 password: ""
 cluster: false
 singleHostname: "127.0.0.1"
 singlePort: 11111
Usage
Here is an example of storing MySQL data using the configuration file as above.
- 
Start a MySQL server. docker pull mysql:5.7
 docker run -d -it --rm --name pulsar-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=canal -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw mysql:5.7
- 
Create a configuration file mysqld.cnf.[mysqld]
 pid-file = /var/run/mysqld/mysqld.pid
 socket = /var/run/mysqld/mysqld.sock
 datadir = /var/lib/mysql
 #log-error = /var/log/mysql/error.log
 # By default we only accept connections from localhost
 #bind-address = 127.0.0.1
 # Disabling symbolic-links is recommended to prevent assorted security risks
 symbolic-links=0
 log-bin=mysql-bin
 binlog-format=ROW
 server_id=1
- 
Copy the configuration file mysqld.cnfto MySQL server.docker cp mysqld.cnf pulsar-mysql:/etc/mysql/mysql.conf.d/
- 
Restart the MySQL server. 
docker restart pulsar-mysql
- Create a test database in MySQL server.
docker exec -it pulsar-mysql /bin/bash
mysql -h 127.0.0.1 -uroot -pcanal -e 'create database test;'
- 
Start a Canal server and connect to MySQL server. docker pull canal/canal-server:v1.1.2
 docker run -d -it --link pulsar-mysql -e canal.auto.scan=false -e canal.destinations=test -e canal.instance.master.address=pulsar-mysql:3306 -e canal.instance.dbUsername=root -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false --name=pulsar-canal-server -p 8000:8000 -p 2222:2222 -p 11111:11111 -p 11112:11112 -m 4096m canal/canal-server:v1.1.2
- 
Start Pulsar standalone. docker pull apachepulsar/pulsar:4.1.1
 docker run --user 0 -d -it --link pulsar-canal-server -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:4.1.1 bin/pulsar standalone
- 
Modify the configuration file canal-mysql-source-config.yaml.configs:
 zkServers: ""
 batchSize: "5120"
 destination: "test"
 username: ""
 password: ""
 cluster: false
 singleHostname: "pulsar-canal-server"
 singlePort: "11111"
- 
Create a consumer file pulsar-client.py.import pulsar
 client = pulsar.Client('pulsar://localhost:6650')
 consumer = client.subscribe('my-topic',
 subscription_name='my-sub')
 while True:
 msg = consumer.receive()
 print("Received message: '%s'" % msg.data())
 consumer.acknowledge(msg)
 client.close()
- 
Copy the configuration file canal-mysql-source-config.yamland the consumer filepulsar-client.pyto Pulsar server.
docker cp canal-mysql-source-config.yaml pulsar-standalone:/pulsar/conf/
docker cp pulsar-client.py pulsar-standalone:/pulsar/
- Download a Canal connector and start it.
docker exec -it pulsar-standalone /bin/bash
curl -LO --output-dir connectors "https://www.apache.org/dyn/closer.lua/pulsar/pulsar-4.1.1/connectors/pulsar-io-canal-4.1.1.nar?action=download"
./bin/pulsar-admin source localrun \
   --archive $PWD/connectors/pulsar-io-canal-4.1.1.nar \
   --classname org.apache.pulsar.io.canal.CanalStringSource \
   --tenant public \
   --namespace default \
   --name canal \
   --destination-topic-name my-topic \
   --source-config-file /pulsar/conf/canal-mysql-source-config.yaml \
   --parallelism 1
- Consume data from MySQL.
docker exec -it pulsar-standalone /bin/bash
python pulsar-client.py
- Open another window to log in MySQL server.
docker exec -it pulsar-mysql /bin/bash
mysql -h 127.0.0.1 -uroot -pcanal
- Create a table, and insert, delete, and update data in MySQL server.
mysql> use test;
mysql> show tables;
mysql> CREATE TABLE IF NOT EXISTS `test_table`(`test_id` INT UNSIGNED AUTO_INCREMENT,`test_title` VARCHAR(100) NOT NULL,
`test_author` VARCHAR(40) NOT NULL,
`test_date` DATE,PRIMARY KEY ( `test_id` ))ENGINE=InnoDB DEFAULT CHARSET=utf8;
mysql> INSERT INTO test_table (test_title, test_author, test_date) VALUES("a", "b", NOW());
mysql> UPDATE test_table SET test_title='c' WHERE test_title='a';
mysql> DELETE FROM test_table WHERE test_title='c';