These instructions assume you are running Pulsar in standalone mode. However, all
the commands used in this tutorial can be used in a multi-nodes Pulsar cluster without any changes.
All the instructions are assumed to run at the root directory of a Pulsar binary distribution.
[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
If an error occurs when starting Pulsar service, you may see an exception at the terminal running pulsar/standalone,
or you can navigate to the logs directory under the Pulsar directory to view the logs.
This section demonstrates how to connector Pulsar to Cassandra.
tip
Make sure you have Docker installed. If you do not have one, see install Docker.
The Cassandra sink connector reads messages from Pulsar topics and writes the messages into Cassandra tables. For more information, see Cassandra sink connector.
$ dockerexec-ti cassandra cqlsh localhost Connected to Test Cluster at localhost:9042. [cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4] Use HELP for help. cqlsh>
Create a keyspace pulsar_test_keyspace.
cqlsh> CREATE KEYSPACE pulsar_test_keyspace WITH replication ={'class':'SimpleStrategy', 'replication_factor':1};
Create a table pulsar_test_table.
cqlsh> USE pulsar_test_keyspace; cqlsh:pulsar_test_keyspace> CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
Now that we have a Cassandra cluster running locally.
In this section, you need to configure a Cassandra sink connector.
To run a Cassandra sink connector, you need to prepare a configuration file including the information that Pulsar connector runtime needs to know.
For example, how Pulsar connector can find the Cassandra cluster, what is the keyspace and the table that Pulsar connector uses for writing Pulsar messages to, and so on.
You can create a configuration file through one of the following methods.
You can use the Connector Admin CLI
to create a sink connector and perform other operations on them.
Run the following command to create a Cassandra sink connector with sink type cassandra and the config file examples/cassandra-sink.yml created previously.
Keep STDIN open even if not attached and allocate a terminal.
/
--rm
Remove the container automatically when it exits.
/
-name
Assign a name to the container.
This example specifies pulsar-mysql for the container.
-p
Publish the port of the container to the host.
This example publishes the port 3306 of the container to the host.
-e
Set environment variables.
This example sets the following variables: - The password for the root user is jdbc. - The name for the normal user is mysqluser. - The password for the normal user is mysqlpw.
tip
For more information about Docker commands, see Docker CLI.
Check if MySQL has been started successfully.
$ docker logs -f pulsar-mysql
MySQL has been started successfully if the following message appears.
2019-05-11T10:40:58.709964Z 0 [Note] Found ca.pem, server-cert.pem and server-key.pem in data directory. Trying to enable SSL support using them. 2019-05-11T10:40:58.710155Z 0 [Warning] CA certificate ca.pem is self signed. 2019-05-11T10:40:58.711921Z 0 [Note] Server hostname (bind-address): '*'; port: 3306 2019-05-11T10:40:58.711985Z 0 [Note] IPv6 is available. 2019-05-11T10:40:58.712695Z 0 [Note] - '::' resolves to '::'; 2019-05-11T10:40:58.712742Z 0 [Note] Server socket created on IP: '::'. 2019-05-11T10:40:58.714334Z 0 [Warning] Insecure configuration for --pid-file: Location '/var/run/mysqld' in the path is accessible to all OS users. Consider choosing a different directory. 2019-05-11T10:40:58.723802Z 0 [Note] Event Scheduler: Loaded 0 events 2019-05-11T10:40:58.724200Z 0 [Note] mysqld: ready for connections. Version: '5.7.26' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL)
Access to MySQL.
$ dockerexec-it pulsar-mysql /bin/bash mysql -h localhost -uroot-pjdbc
Create a MySQL table pulsar_mysql_jdbc_sink.
$ create database pulsar_mysql_jdbc_sink; $ use pulsar_mysql_jdbc_sink; $ create table if not exists pulsar_mysql_jdbc_sink ( id INT AUTO_INCREMENT, name VARCHAR(255) NOT NULL, primary key (id) ) engine=innodb;
Once the command is executed, Pulsar creates a sink connector pulsar-mysql-jdbc-sink.
This sink connector runs as a Pulsar Function and writes the messages produced in the topic pulsar-mysql-jdbc-sink-topic to the MySQL table pulsar_mysql_jdbc_sink.
For more information about pulsar-admin sinks restart options, see here.
The sink instance has been started successfully if the following message disappears.
"Started successfully"
tip
Optionally, you can run a standalone sink connector using pulsar-admin sinks localrun options.
Note that pulsar-admin sinks localrun optionsruns a sink connector locally, while pulsar-admin sinks start optionsstarts a sink connector in a cluster.
For more information about pulsar-admin sinks localrun options, see here.