These instructions assume you are running Pulsar in standalone mode. However, all
the commands used in this tutorial can be used in a multi-node Pulsar cluster without any changes.
All the instructions are assumed to run at the root directory of a Pulsar binary distribution.
[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
If an error occurs when starting Pulsar service, you may see an exception at the terminal running pulsar/standalone,
or you can navigate to the logs directory under the Pulsar directory to view the logs.
This section demonstrates how to connect Pulsar to Cassandra.
tip
Make sure you have Docker installed. If you do not have one, see install Docker.
The Cassandra sink connector reads messages from Pulsar topics and writes the messages into Cassandra tables. For more information, see Cassandra sink connector.
$ dockerexec-ti cassandra cqlsh localhost Connected to Test Cluster at localhost:9042. [cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4] Use HELP for help. cqlsh>
Create a keyspace pulsar_test_keyspace.
cqlsh> CREATE KEYSPACE pulsar_test_keyspace WITH replication ={'class':'SimpleStrategy', 'replication_factor':1};
Create a table pulsar_test_table.
cqlsh> USE pulsar_test_keyspace; cqlsh:pulsar_test_keyspace> CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
Now that we have a Cassandra cluster running locally.
In this section, you need to configure a Cassandra sink connector.
To run a Cassandra sink connector, you need to prepare a configuration file including the information that Pulsar connector runtime needs to know.
For example, how Pulsar connector can find the Cassandra cluster, what is the keyspace and the table that Pulsar connector uses for writing Pulsar messages to, and so on.
You can create a configuration file through one of the following methods.
You can use the Connector Admin CLI
to create a sink connector and perform other operations on them.
Run the following command to create a Cassandra sink connector with sink type cassandra and the config file examples/cassandra-sink.yml created previously.
Keep STDIN open even if not attached and allocate a terminal.
/
--rm
Remove the container automatically when it exits.
/
-name
Assign a name to the container.
This example specifies pulsar-postgres for the container.
-p
Publish the port of the container to the host.
This example publishes the port 5432 of the container to the host.
-e
Set environment variables.
This example sets the following variables: - The password for the user is password. - The name for the user is postgres.
tip
For more information about Docker commands, see Docker CLI.
Check if PostgreSQL has been started successfully.
$ docker logs -f pulsar-postgres
PostgreSQL has been started successfully if the following message appears.
2020-05-11 20:09:24.492 UTC [1] LOG: starting PostgreSQL 12.2 (Debian 12.2-2.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit 2020-05-11 20:09:24.492 UTC [1] LOG: listening on IPv4 address "0.0.0.0", port 5432 2020-05-11 20:09:24.492 UTC [1] LOG: listening on IPv6 address "::", port 5432 2020-05-11 20:09:24.499 UTC [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432" 2020-05-11 20:09:24.523 UTC [55] LOG: database system was shut down at 2020-05-11 20:09:24 UTC 2020-05-11 20:09:24.533 UTC [1] LOG: database system is ready to accept connections
Access to PostgreSQL.
$ dockerexec-it pulsar-postgres /bin/bash
Create a PostgreSQL table pulsar_postgres_jdbc_sink.
$ psql -U postgres postgres postgres=# create table if not exists pulsar_postgres_jdbc_sink ( id serial PRIMARY KEY, name VARCHAR(255) NOT NULL );
Once the command is executed, Pulsar creates a sink connector pulsar-postgres-jdbc-sink.
This sink connector runs as a Pulsar Function and writes the messages produced in the topic pulsar-postgres-jdbc-sink-topic to the PostgreSQL table pulsar_postgres_jdbc_sink.
For more information about pulsar-admin sinks restart options, see Pulsar admin docs.
The sink instance has been started successfully if the following message disappears.
Started successfully
tip
Optionally, you can run a standalone sink connector using pulsar-admin sinks localrun options.
Note that pulsar-admin sinks localrun optionsruns a sink connector locally, while pulsar-admin sinks start optionsstarts a sink connector in a cluster.
For more information about pulsar-admin sinks localrun options, see Pulsar admin docs.