Skip to main content
Version: Next

How to use Pulsar connectors

This guide describes how to use Pulsar connectors.

Install a connectorโ€‹

Pulsar bundles several built-in connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-built-in connectors.

note

When using a non-built-in connector, you need to specify the path of an archive file for the connector.

To set up a built-in connector, follow the instructions.

After the setup, the built-in connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.

Configure a connectorโ€‹

You can configure the following information:

Configure a default storage location for a connectorโ€‹

To configure a default folder for built-in connectors, set the connectorsDirectory parameter in the ./conf/functions_worker.yml configuration file.

Example

Set the ./connectors folder as the default storage location for built-in connectors.

########################
# Connectors
########################

connectorsDirectory: ./connectors

Configure a connector with a YAML fileโ€‹

To configure a connector, you need to provide a YAML configuration file when creating a connector.

The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.

Example 1

Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:

  • Which Cassandra cluster to connect

  • What is the keyspace and columnFamily to be used in Cassandra for collecting data

  • How to map Pulsar messages into Cassandra table key and columns

tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"

Example 2

Below is a YAML configuration file of a Kafka source.

configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"

Example 3

Below is a YAML configuration file of a PostgreSQL JDBC sink.

configs:
userName: "postgres"
password: "password"
jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
tableName: "test_jdbc"

Get available connectorsโ€‹

Before starting using connectors, you can perform the following operations:

reloadโ€‹

If you add or delete a nar file in a connector folder, reload the available built-in connector before using it.

Sourceโ€‹

Use the reload subcommand.

pulsar-admin sources reload

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Use the reload subcommand.

pulsar-admin sinks reload

For the latest and complete information, see Pulsar admin docs.

availableโ€‹

After reloading connectors (optional), you can get a list of available connectors.

Sourceโ€‹

Use the available-sources subcommand.

pulsar-admin sources available-sources

Sinkโ€‹

Use the available-sinks subcommand.

pulsar-admin sinks available-sinks

Run a connectorโ€‹

To run a connector, you can perform the following operations:

createโ€‹

You can create a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Create a source connector.

Use the create subcommand.

pulsar-admin sources create options

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Create a sink connector.

Use the create subcommand.

pulsar-admin sinks create options

For the latest and complete information, see Pulsar admin docs.

startโ€‹

You can start a connector using Admin CLI or REST API.

Sourceโ€‹

Start a source connector.

Use the start subcommand.

pulsar-admin sources start options

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Start a sink connector.

Use the start subcommand.

pulsar-admin sinks start options

For the latest and complete information, see Pulsar admin docs.

localrunโ€‹

You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.

Sourceโ€‹

Run a source connector locally.

Use the localrun subcommand.

pulsar-admin sources localrun options

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Run a sink connector locally.

Use the localrun subcommand.

pulsar-admin sinks localrun options

For the latest and complete information, see Pulsar admin docs.

Monitor a connectorโ€‹

To monitor a connector, you can perform the following operations:

getโ€‹

You can get the information of a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Get the information of a source connector.

Use the get subcommand.

pulsar-admin sources get options

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Get the information of a sink connector.

Use the get subcommand.

pulsar-admin sinks get options

For the latest and complete information, see Pulsar admin docs.

listโ€‹

You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Get the list of all running source connectors.

Use the list subcommand.

pulsar-admin sources list options

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Get the list of all running sink connectors.

Use the list subcommand.

pulsar-admin sinks list options

For the latest and complete information, see Pulsar admin docs.

statusโ€‹

You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Get the current status of a source connector.

Use the status subcommand.

pulsar-admin sources status options

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Get the current status of a Pulsar sink connector.

Use the status subcommand.

pulsar-admin sinks status options

For the latest and complete information, see Pulsar admin docs.

Update a connectorโ€‹

updateโ€‹

You can update a running connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Update a running Pulsar source connector.

Use the update subcommand.

pulsar-admin sources update options

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Update a running Pulsar sink connector.

Use the update subcommand.

pulsar-admin sinks update options

For the latest and complete information, see Pulsar admin docs.

Stop a connectorโ€‹

stopโ€‹

You can stop a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Stop a source connector.

Use the stop subcommand.

pulsar-admin sources stop options

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Stop a sink connector.

Use the stop subcommand.

pulsar-admin sinks stop options

For the latest and complete information, see Pulsar admin docs.

Restart a connectorโ€‹

restartโ€‹

You can restart a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Restart a source connector.

Use the restart subcommand.

pulsar-admin sources restart options

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Restart a sink connector.

Use the restart subcommand.

pulsar-admin sinks restart options

For the latest and complete information, see Pulsar admin docs.

Delete a connectorโ€‹

deleteโ€‹

You can delete a connector using Admin CLI, REST API or JAVA admin API.

Sourceโ€‹

Delete a source connector.

Use the delete subcommand.

pulsar-admin sources delete options

For the latest and complete information, see Pulsar admin docs.

Sinkโ€‹

Delete a sink connector.

Use the delete subcommand.

pulsar-admin sinks delete options

For the latest and complete information, see Pulsar admin docs.