Skip to main content

How to use Pulsar connectors

This guide describes how to use Pulsar connectors.

Install a connector

Pulsar bundles several built-in connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-built-in connectors.

note

When using a non-built-in connector, you need to specify the path of an archive file for the connector.

To set up a built-in connector, follow the instructions.

After the setup, the built-in connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.

Configure a connector

You can configure the following information:

Configure a default storage location for a built-in connector

To configure a default folder for built-in connectors, set the connectorsDirectory parameter in the ./conf/functions_worker.yml configuration file.

Example

Set the ./connectors folder as the default storage location for built-in connectors.

########################
# Connectors
########################

connectorsDirectory: ./connectors

Configure a connector with a YAML file

To configure a connector, you need to provide a YAML configuration file when creating a connector.

The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.

Example 1

Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:

  • Which Cassandra cluster to connect

  • What is the keyspace and columnFamily to be used in Cassandra for collecting data

  • How to map Pulsar messages into Cassandra table key and columns

tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"

Example 2

Below is a YAML configuration file of a Kafka source.

configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"

Example 3

Below is a YAML configuration file of a PostgreSQL JDBC sink.

configs:
userName: "postgres"
password: "password"
jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
tableName: "test_jdbc"

Prepare a connector

Before starting using connectors, you can perform the following operations:

reload

If you add or delete a nar file in a connector folder, reload the available built-in connector before using it.

Source

To reload source connectors, you can use the reload subcommand.

pulsar-admin sources reload

For the latest and complete information, see Pulsar admin docs.

Sink

To reload sink connectors, you can use the reload subcommand.

pulsar-admin sinks reload

For the latest and complete information, see Pulsar admin docs.

available

After reloading connectors (optional), you can get a list of available connectors.

Source

To get a list of source connectors, you can use the available-sources subcommand.

pulsar-admin sources available-sources

Sink

To get a list of sink connectors, you can use the available-sinks subcommand.

pulsar-admin sinks available-sinks

Run a connector

To run a connector, you can perform the following operations:

create

To create a connector, you can use Admin CLI, REST API or JAVA admin API.

Source

To create a source connector, you can use the following commands:

Use the create subcommand.

pulsar-admin sources create options

For the latest and complete information, see Pulsar admin docs.

Sink

To create a sink connector, you can use the following commands:

Use the create subcommand.

pulsar-admin sinks create options

For the latest and complete information, see Pulsar admin docs.

start

To start a connector, you can use Admin CLI or REST API.

Source

To start a source connector, you can use the following commands.

Use the start subcommand.

pulsar-admin sources start options

For the latest and complete information, see Pulsar admin docs.

Sink

To start a sink connector, you can use the following commands:

Use the start subcommand.

pulsar-admin sinks start options

For the latest and complete information, see Pulsar admin docs.

localrun

To run a connector locally rather than deploying it on a Pulsar cluster, you can use Admin CLI

Source

To run a source connector locally, you can use the following command:

Use the localrun subcommand.

pulsar-admin sources localrun options

For the latest and complete information, see Pulsar admin docs.

Sink

To run a sink connector locally, you can use the following command:

Use the localrun subcommand.

pulsar-admin sinks localrun options

For the latest and complete information, see Pulsar admin docs.

Run a Pulsar Function before a sink connector

You can run a Pulsar Function in memory before a sink connector. For details, see PIP 193: Sink preprocessing Function. Running a Pulsar Function in memory before a sink connector provides lower latency, less I/O, and disk consumption than going through an intermediate topic. Use the --transform-function, --transform-function-classname and --transform-function-config options when creating the sink connector to configure the transform Function to run.

For the latest and complete information, see Pulsar admin sinks command docs.

Monitor a connector

To monitor a connector, you can perform the following operations:

get

To get the information of a connector, You can use Admin CLI, REST API or JAVA admin API.

Source

To get the information of a source connector, you can use the following commands:

Use the get subcommand.

pulsar-admin sources get options

For the latest and complete information, see Pulsar admin docs.

Sink

To get the information of a sink connector, you can use the following commands:

Use the get subcommand.

pulsar-admin sinks get options

For the latest and complete information, see Pulsar admin docs.

list

To get the list of all running connectors, You can use Admin CLI, REST API or JAVA admin API.

Source

To get the list of all running source connectors, you can use the following commands:

Use the list subcommand.

pulsar-admin sources list options

For the latest and complete information, see Pulsar admin docs.

Sink

To get the list of all running sink connectors, you can use the following commands:

Use the list subcommand.

pulsar-admin sinks list options

For the latest and complete information, see Pulsar admin docs.

status

To get the current status of a connector, you can use Admin CLI, REST API or JAVA admin API.

Source

To get the current status of a source connector, you can use the following commands:

Use the status subcommand.

pulsar-admin sources status options

For the latest and complete information, see Pulsar admin docs.

Sink

To get the current status of a Pulsar sink connector, you can use the following commads:

Use the status subcommand.

pulsar-admin sinks status options

For the latest and complete information, see Pulsar admin docs.

Update a connector

update

To update a running connector, you can use Admin CLI, REST API or JAVA admin API.

Source

To update a running Pulsar source connector, you can use the following commands:

Use the update subcommand.

pulsar-admin sources update options

For the latest and complete information, see Pulsar admin docs.

Sink

To update a running Pulsar sink connector, you can use the following commands:

Use the update subcommand.

pulsar-admin sinks update options

For the latest and complete information, see Pulsar admin docs.

Stop a connector

stop

To stop a connector, you can use Admin CLI, REST API or JAVA admin API.

Source

To stop a source connector, you can use the following commands:

Use the stop subcommand.

pulsar-admin sources stop options

For the latest and complete information, see Pulsar admin docs.

Sink

To stop a sink connector, you can use the following commands:

Use the stop subcommand.

pulsar-admin sinks stop options

For the latest and complete information, see Pulsar admin docs.

Restart a connector

restart

To restart a connector, you can use Admin CLI, REST API or JAVA admin API.

Source

To restart a source connector, you can use the following commands:

Use the restart subcommand.

pulsar-admin sources restart options

For the latest and complete information, see Pulsar admin docs.

Sink

To restart a sink connector, you can use the following commands:

Use the restart subcommand.

pulsar-admin sinks restart options

For the latest and complete information, see Pulsar admin docs.

Delete a connector

delete

To delete a connector, you can use Admin CLI, REST API or JAVA admin API.

Source

To delete a source connector, you can use the following commands:

Use the delete subcommand.

pulsar-admin sources delete options

For the latest and complete information, see Pulsar admin docs.

Sink

To delete a sink connector, you can use the following commands:

Use the delete subcommand.

pulsar-admin sinks delete options

For the latest and complete information, see Pulsar admin docs.