These instructions assume you are running Pulsar in standalone mode. However, all the commands used in this tutorial can be used in a multi-node Pulsar cluster without any changes.
All the instructions are assumed to run at the root directory of a Pulsar binary distribution.
To enable Pulsar connectors, you need to download the connectors' tarball release on download page.
After you download the NAR file, copy the file to the connectors directory in the Pulsar directory. For example, if you download the pulsar-io-aerospike-3.3.2.nar connector file, enter the following commands:
If you are running Pulsar in a bare metal cluster, make sure connectors tarball is unzipped in every pulsar directory of the broker (or in every pulsar directory of function-worker if you are running a separate worker cluster for Pulsar Functions).
If you are running Pulsar in Docker or deploying Pulsar using a docker image (e.g. K8S), you can use the apachepulsar/pulsar-all image instead of the apachepulsar/pulsar image. The apachepulsar/pulsar-all image has already bundled all built-in connectors.
[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
If an error occurs when starting the Pulsar service, you may see an exception at the terminal running pulsar/standalone,
or you can navigate to the logs directory under the Pulsar directory to view the logs.
This section demonstrates how to connect Pulsar to Cassandra.
tip
Make sure you have Docker installed. If you do not have one, see install Docker. For more information about Docker commands, see Docker CLI.
The Cassandra sink connector reads messages from Pulsar topics and writes the messages into Cassandra tables. For more information, see Cassandra sink connector.
To connect Pulsar to Cassandra, you can follow the steps below:
Now that we have a Cassandra cluster running locally.
In this section, you need to configure a Cassandra sink connector.
To run a Cassandra sink connector, you need to prepare a configuration file including the information that Pulsar connector runtime needs to know.
For example, how Pulsar connector can find the Cassandra cluster, what is the keyspace and the table that Pulsar connector uses for writing Pulsar messages to, and so on.
You can create a configuration file through one of the following methods.
You can use the Connector Admin CLI to create a sink connector and perform other operations on them.
Run the following command to create a Cassandra sink connector with sink type cassandra and the config file examples/cassandra-sink.yml created previously.
note
The sink-type parameter of the currently built-in connectors is determined by the setting of the name parameter specified in the pulsar-io.yaml file.
This section demonstrates how to connect Pulsar to PostgreSQL.
tip
Make sure you have Docker installed. If you do not have one, see install Docker. For more information about Docker commands, see Docker CLI.
The JDBC sink connector pulls messages from Pulsar topics and persists the messages to ClickHouse, MariaDB, PostgreSQL, or SQLite. For more information, see JDBC sink connector.
To connect Pulsar to PostgreSQL, you can follow the steps below:
Check if PostgreSQL has been started successfully.
docker logs -f pulsar-postgres
PostgreSQL has been started successfully if the following message appears.
2020-05-11 20:09:24.492 UTC [1] LOG: starting PostgreSQL 12.2 (Debian 12.2-2.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit 2020-05-11 20:09:24.492 UTC [1] LOG: listening on IPv4 address "0.0.0.0", port 5432 2020-05-11 20:09:24.492 UTC [1] LOG: listening on IPv6 address "::", port 5432 2020-05-11 20:09:24.499 UTC [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432" 2020-05-11 20:09:24.523 UTC [55] LOG: database system was shut down at 2020-05-11 20:09:24 UTC 2020-05-11 20:09:24.533 UTC [1] LOG: database system is ready to accept connections
Access to PostgreSQL container.
dockerexec-it pulsar-postgres /bin/bash
Log in to PostgreSQL with the default username and password:
psql -U postgres postgres
Create a pulsar_postgres_jdbc_sink table using the following command:
createtableifnotexists pulsar_postgres_jdbc_sink ( id serialPRIMARYKEY, name VARCHAR(255)NOTNULL );
Once the command is executed, Pulsar creates a sink connector pulsar-postgres-jdbc-sink.
This sink connector runs as a Pulsar Function and writes the messages produced in the topic pulsar-postgres-jdbc-sink-topic to the PostgreSQL table pulsar_postgres_jdbc_sink.
For more information about pulsar-admin sinks restart options, see Pulsar admin docs.
The sink instance has been started successfully if the following message disappears.
Started successfully
tip
Optionally, you can run a standalone sink connector using pulsar-admin sinks localrun options.
Note that pulsar-admin sinks localrun optionsruns a sink connector locally, while pulsar-admin sinks start optionsstarts a sink connector in a cluster.
For more information about pulsar-admin sinks localrun options, see Pulsar admin docs.