Skip to main content

How to use Pulsar connectors

This guide describes how to use Pulsar connectors.

Install a connectorโ€‹

Pulsar bundles several builtin connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-builtin connectors.

note

When using a non-builtin connector, you need to specify the path of a archive file for the connector.

To set up a builtin connector, follow the instructions here.

After the setup, the builtin connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.

Configure a connectorโ€‹

You can configure the following information:

Configure a default storage location for a connectorโ€‹

To configure a default folder for builtin connectors, set the connectorsDirectory parameter in the ./conf/functions_worker.yml configuration file.

Example

Set the ./connectors folder as the default storage location for builtin connectors.


########################
# Connectors
########################

connectorsDirectory: ./connectors

Configure a connector with a YAML fileโ€‹

To configure a connector, you need to provide a YAML configuration file when creating a connector.

The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.

Example 1

Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:

  • Which Cassandra cluster to connect
  • What is the keyspace and columnFamily to be used in Cassandra for collecting data
  • How to map Pulsar messages into Cassandra table key and columns

tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"

Example 2

Below is a YAML configuration file of a Kafka source.


configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"

Example 3

Below is a YAML configuration file of a PostgreSQL JDBC sink.


configs:
userName: "postgres"
password: "password"
jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
tableName: "test_jdbc"

Get available connectorsโ€‹

Before starting using connectors, you can perform the following operations:

reloadโ€‹

If you add or delete a nar file in a connector folder, reload the available builtin connector before using it.

Sourceโ€‹

Use the reload subcommand.


$ pulsar-admin sources reload

For more information, see here.

Sinkโ€‹

Use the reload subcommand.


$ pulsar-admin sinks reload

For more information, see here.

availableโ€‹

After reloading connectors (optional), you can get a list of available connectors.

Sourceโ€‹

Use the available-sources subcommand.


$ pulsar-admin sources available-sources

Sinkโ€‹

Use the available-sinks subcommand.


$ pulsar-admin sinks available-sinks

Run a connectorโ€‹

To run a connector, you can perform the following operations:

createโ€‹

You can create a connector using Admin CLI, REST API or JAVA admin API.f

Sourceโ€‹

Create a source connector.

Use the create subcommand.


$ pulsar-admin sources create options

For more information, see here.

Sinkโ€‹

Create a sink connector.

Use the create subcommand.


$ pulsar-admin sinks create options

For more information, see here.

startโ€‹

You can start a connector using Admin CLI or REST API.

Sourceโ€‹

Start a source connector.

Use the start subcommand.


$ pulsar-admin sources start options

For more information, see here.

Sinkโ€‹

Start a sink connector.

Use the start subcommand.


$ pulsar-admin sinks start options

For more information, see here.

localrunโ€‹

You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.

Sourceโ€‹

Run a source connector locally.

Use the localrun subcommand.


$ pulsar-admin sources localrun options

For more information, see here.

Sinkโ€‹

Run a sink connector locally.

Use the localrun subcommand.


$ pulsar-admin sinks localrun options

For more information, see here.

Monitor a connectorโ€‹

To monitor a connector, you can perform the following operations:

getโ€‹

You can get the information of a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Get the information of a source connector.

Use the get subcommand.


$ pulsar-admin sources get options

For more information, see here.

Sinkโ€‹

Get the information of a sink connector.

Use the get subcommand.


$ pulsar-admin sinks get options

For more information, see here.

listโ€‹

You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Get the list of all running source connectors.

Use the list subcommand.


$ pulsar-admin sources list options

For more information, see here.

Sinkโ€‹

Get the list of all running sink connectors.

Use the list subcommand.


$ pulsar-admin sinks list options

For more information, see here.

statusโ€‹

You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Get the current status of a source connector.

Use the status subcommand.


$ pulsar-admin sources status options

For more information, see here.

Sinkโ€‹

Get the current status of a Pulsar sink connector.

Use the status subcommand.


$ pulsar-admin sinks status options

For more information, see here.

Update a connectorโ€‹

updateโ€‹

You can update a running connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Update a running Pulsar source connector.

Use the update subcommand.


$ pulsar-admin sources update options

For more information, see here.

Sinkโ€‹

Update a running Pulsar sink connector.

Use the update subcommand.


$ pulsar-admin sinks update options

For more information, see here.

Stop a connectorโ€‹

stopโ€‹

You can stop a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Stop a source connector.

Use the stop subcommand.


$ pulsar-admin sources stop options

For more information, see here.

Sinkโ€‹

Stop a sink connector.

Use the stop subcommand.


$ pulsar-admin sinks stop options

For more information, see here.

Restart a connectorโ€‹

restartโ€‹

You can restart a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Restart a source connector.

Use the restart subcommand.


$ pulsar-admin sources restart options

For more information, see here.

Sinkโ€‹

Restart a sink connector.

Use the restart subcommand.


$ pulsar-admin sinks restart options

For more information, see here.

Delete a connectorโ€‹

deleteโ€‹

You can delete a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Delete a source connector.

Use the delete subcommand.


$ pulsar-admin sources delete options

For more information, see here.

Sinkโ€‹

Delete a sink connector.

Use the delete subcommand.


$ pulsar-admin sinks delete options

For more information, see here.