These instructions assume you are running Pulsar in standalone mode. However all
the commands used in this tutorial should be able to be used in a multi-nodes Pulsar cluster without any changes.
All the instructions are assumed to run at the root directory of a Pulsar binary distribution.
All the components of a Pulsar service will start in order. You can curl those pulsar service endpoints to make sure Pulsar service is up running correctly.
[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running pulsar/standalone,
or you can navigate the logs directory under the Pulsar directory to view the logs.
We are using cqlsh to connect to the cassandra cluster to create keyspace and table.
$ dockerexec-ti cassandra cqlsh localhost Connected to Test Cluster at localhost:9042. [cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4] Use HELP for help. cqlsh>
Now that we have a Cassandra cluster running locally. In this section, we will configure a Cassandra sink connector.
The Cassandra sink connector will read messages from a Pulsar topic and write the messages into a Cassandra table.
In order to run a Cassandra sink connector, you need to prepare a yaml config file including information that Pulsar IO
runtime needs to know. For example, how Pulsar IO can find the cassandra cluster, what is the keyspace and table that
Pulsar IO will be using for writing Pulsar messages to.
Create a file examples/cassandra-sink.yml and edit it to fill in following content:
Once the command is executed, Pulsar will create a sink connector named cassandra-test-sink and the sink connector will be running
as a Pulsar Function and write the messages produced in topic test_cassandra to Cassandra table pulsar_test_table.
| - | -
-d | To start a container in detached mode. | /
-it | Keep STDIN open even if not attached and allocate a terminal. | /
--rm | Remove the container automatically when it exits. | /
-name | Assign a name to the container. | This example specifies pulsar-mysql for the container.
-p | Publish the port of the container to the host. | This example publishes the port 3306 of the container to the host.
-e | Set environment variables. | This example sets the following variables: - The password for the root user is jdbc. - The name for the normal user is mysqluser. - The password for the normal user is mysqlpw.
For more information about Docker command, see here.
Check if MySQL has been started successfully.
$ docker logs -f pulsar-mysql
MySQL has been started successfully if the following message appears.
2019-05-11T10:40:58.709964Z 0 [Note] Found ca.pem, server-cert.pem and server-key.pem in data directory. Trying to enable SSL support using them. 2019-05-11T10:40:58.710155Z 0 [Warning] CA certificate ca.pem is self signed. 2019-05-11T10:40:58.711921Z 0 [Note] Server hostname (bind-address): '*'; port: 3306 2019-05-11T10:40:58.711985Z 0 [Note] IPv6 is available. 2019-05-11T10:40:58.712695Z 0 [Note] - '::' resolves to '::'; 2019-05-11T10:40:58.712742Z 0 [Note] Server socket created on IP: '::'. 2019-05-11T10:40:58.714334Z 0 [Warning] Insecure configuration for --pid-file: Location '/var/run/mysqld' in the path is accessible to all OS users. Consider choosing a different directory. 2019-05-11T10:40:58.723802Z 0 [Note] Event Scheduler: Loaded 0 events 2019-05-11T10:40:58.724200Z 0 [Note] mysqld: ready for connections. Version: '5.7.26' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL)
$ create database pulsar_mysql_jdbc_sink; $ use pulsar_mysql_jdbc_sink; $ create table if not exists pulsar_mysql_jdbc_sink ( id INT AUTO_INCREMENT, name VARCHAR(255) NOT NULL, primary key (id) ) engine=innodb;
Now that we have a MySQL running locally. In this section, we will configure a JDBC sink connector. The JDBC sink connector will read messages from a Pulsar topic and write messages into a MySQL table.
Add a configuration file.
To run a JDBC sink connector, you need to prepare a yaml config file including the information that Pulsar IO runtime needs to know. For example, how Pulsar IO can find the MySQL cluster, what is the JDBCURL and the table that Pulsar IO will use for writing messages to.
Create a pulsar-mysql-jdbc-sink.yaml file , copy the following contents to this file, and place the file in the pulsar/connectors folder.
Once the command is executed, Pulsar will create a sink connector named pulsar-mysql-jdbc-sink, and the sink connector will be running as a Pulsar Function and write the messages produced in the pulsar-mysql-jdbc-sink-topic topic to the MySQL pulsar_mysql_jdbc_sink table.
tip
Flag | Description | This example
| - | - |
--archive | Path to the archive file for the sink. | pulsar-io-jdbc-2.4.2.nar--inputs | The input topic or topics of the sink. (Multiple topics can be specified as a comma-separated list.)
--name | The name of the sink. | pulsar-mysql-jdbc-sink--sink-config-file | The path to a YAML config file specifying the configuration of the sink. | pulsar-mysql-jdbc-sink.yaml--parallelism | The parallelism factor of the sink. For example, the number of sink instances to run. | 1
For more information about pulsar-admin sinks create options, see here.
The sink has been created successfully if the following message appears.
The sink instance has been started successfully if the following message disappears.
"Started successfully"
tip
Optionally, you can run a standalone sink connector using pulsar-admin sinks localrun options.
Note that pulsar-admin sinks localrun options runs a sink connector locally, while pulsar-admin sinks start options starts a sink connector in a cluster.
For more information about pulsar-admin sinks localrun options, see here.