Skip to main content

How to use Pulsar connectors

This guide describes how to use Pulsar connectors.

Install a connector​

Pulsar bundles several built-in connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-built-in connectors.

note

When using a non-built-in connector, you need to specify the path of an archive file for the connector.

To set up a built-in connector, follow the instructions.

After the setup, the built-in connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.

Configure a connector​

You can configure the following information:

Configure a default storage location for a connector​

To configure a default folder for built-in connectors, set the connectorsDirectory parameter in the ./conf/functions_worker.yml configuration file.

Example

Set the ./connectors folder as the default storage location for built-in connectors.

########################
# Connectors
########################

connectorsDirectory: ./connectors

Configure a connector with a YAML file​

To configure a connector, you need to provide a YAML configuration file when creating a connector.

The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.

Example 1

Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:

  • Which Cassandra cluster to connect

  • What is the keyspace and columnFamily to be used in Cassandra for collecting data

  • How to map Pulsar messages into Cassandra table key and columns

tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"

Example 2

Below is a YAML configuration file of a Kafka source.

configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"

Example 3

Below is a YAML configuration file of a PostgreSQL JDBC sink.

configs:
userName: "postgres"
password: "password"
jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
tableName: "test_jdbc"

Get available connectors​

Before starting using connectors, you can perform the following operations:

reload​

If you add or delete a nar file in a connector folder, reload the available built-in connector before using it.

Source​

Use the reload subcommand.

pulsar-admin sources reload

For the latest and complete information, see Pulsar admin docs.

Sink​

Use the reload subcommand.

pulsar-admin sinks reload

For the latest and complete information, see Pulsar admin docs.

available​

After reloading connectors (optional), you can get a list of available connectors.

Source​

Use the available-sources subcommand.

pulsar-admin sources available-sources

Sink​

Use the available-sinks subcommand.

pulsar-admin sinks available-sinks

Run a connector​

To run a connector, you can perform the following operations:

create​

You can create a connector using Admin CLI, REST API or JAVA admin API.

Source​

Create a source connector.

Use the create subcommand.

pulsar-admin sources create options

For the latest and complete information, see Pulsar admin docs.

Sink​

Create a sink connector.

Use the create subcommand.

pulsar-admin sinks create options

For the latest and complete information, see Pulsar admin docs.

start​

You can start a connector using Admin CLI or REST API.

Source​

Start a source connector.

Use the start subcommand.

pulsar-admin sources start options

For the latest and complete information, see Pulsar admin docs.

Sink​

Start a sink connector.

Use the start subcommand.

pulsar-admin sinks start options

For the latest and complete information, see Pulsar admin docs.

localrun​

You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.

Source​

Run a source connector locally.

Use the localrun subcommand.

pulsar-admin sources localrun options

For the latest and complete information, see Pulsar admin docs.

Sink​

Run a sink connector locally.

Use the localrun subcommand.

pulsar-admin sinks localrun options

For the latest and complete information, see Pulsar admin docs.

Run a Pulsar Function before a sink connector​

You can run a Pulsar Function in memory before a sink connector. For details, see PIP 193: Sink preprocessing Function. Running a Pulsar Function in memory before a sink connector provides lower latency, less I/O, and disk consumption than going through an intermediate topic. Use the --transform-function, --transform-function-classname and --transform-function-config options when creating the sink connector to configure the transform Function to run.

For the latest and complete information, see Pulsar admin sinks command docs.

Monitor a connector​

To monitor a connector, you can perform the following operations:

get​

You can get the information of a connector using Admin CLI, REST API or JAVA admin API.

Source​

Get the information of a source connector.

Use the get subcommand.

pulsar-admin sources get options

For the latest and complete information, see Pulsar admin docs.

Sink​

Get the information of a sink connector.

Use the get subcommand.

pulsar-admin sinks get options

For the latest and complete information, see Pulsar admin docs.

list​

You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.

Source​

Get the list of all running source connectors.

Use the list subcommand.

pulsar-admin sources list options

For the latest and complete information, see Pulsar admin docs.

Sink​

Get the list of all running sink connectors.

Use the list subcommand.

pulsar-admin sinks list options

For the latest and complete information, see Pulsar admin docs.

status​

You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.

Source​

Get the current status of a source connector.

Use the status subcommand.

pulsar-admin sources status options

For the latest and complete information, see Pulsar admin docs.

Sink​

Get the current status of a Pulsar sink connector.

Use the status subcommand.

pulsar-admin sinks status options

For the latest and complete information, see Pulsar admin docs.

Update a connector​

update​

You can update a running connector using Admin CLI, REST API or JAVA admin API.

Source​

Update a running Pulsar source connector.

Use the update subcommand.

pulsar-admin sources update options

For the latest and complete information, see Pulsar admin docs.

Sink​

Update a running Pulsar sink connector.

Use the update subcommand.

pulsar-admin sinks update options

For the latest and complete information, see Pulsar admin docs.

Stop a connector​

stop​

You can stop a connector using Admin CLI, REST API or JAVA admin API.

Source​

Stop a source connector.

Use the stop subcommand.

pulsar-admin sources stop options

For the latest and complete information, see Pulsar admin docs.

Sink​

Stop a sink connector.

Use the stop subcommand.

pulsar-admin sinks stop options

For the latest and complete information, see Pulsar admin docs.

Restart a connector​

restart​

You can restart a connector using Admin CLI, REST API or JAVA admin API.

Source​

Restart a source connector.

Use the restart subcommand.

pulsar-admin sources restart options

For the latest and complete information, see Pulsar admin docs.

Sink​

Restart a sink connector.

Use the restart subcommand.

pulsar-admin sinks restart options

For the latest and complete information, see Pulsar admin docs.

Delete a connector​

delete​

You can delete a connector using Admin CLI, REST API or JAVA admin API.

Source​

Delete a source connector.

Use the delete subcommand.

pulsar-admin sources delete options

For the latest and complete information, see Pulsar admin docs.

Sink​

Delete a sink connector.

Use the delete subcommand.

pulsar-admin sinks delete options

For the latest and complete information, see Pulsar admin docs.