How to use Pulsar connectors
This guide describes how to use Pulsar connectors.
Install a connector
Pulsar bundles several builtin connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-builtin connectors.
When using a non-builtin connector, you need to specify the path of a archive file for the connector.
To set up a builtin connector, follow the instructions here.
After the setup, the builtin connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.
Configure a connector
You can configure the following information:
Configure a default storage location for a connector
To configure a default folder for builtin connectors, set the connectorsDirectory parameter in the ./conf/functions_worker.yml configuration file.
Example
Set the ./connectors folder as the default storage location for builtin connectors.
########################
# Connectors
########################
connectorsDirectory: ./connectors
Configure a connector with a YAML file
To configure a connector, you need to provide a YAML configuration file when creating a connector.
The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.
Example 1
Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:
-
Which Cassandra cluster to connect
-
What is the
keyspaceandcolumnFamilyto be used in Cassandra for collecting data -
How to map Pulsar messages into Cassandra table key and columns
tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"
Example 2
Below is a YAML configuration file of a Kafka source.
configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"
Example 3
Below is a YAML configuration file of a MySQL JDBC sink.
configs:
userName: "root"
password: "jdbc"
jdbcUrl: "jdbc:mysql://127.0.0.1:3306/test_jdbc"
tableName: "test_jdbc"
Get available connectors
Before starting using connectors, you can perform the following operations:
reload
If you add or delete a nar file in a connector folder, reload the available builtin connector before using it.
Source
Use the reload subcommand.
$ pulsar-admin sources reload
For more information, see here.
Sink
Use the reload subcommand.
$ pulsar-admin sinks reload
For more information, see here.
available
After reloading connectors (optional), you can get a list of available connectors.
Source
Use the available-sources subcommand.
$ pulsar-admin sources available-sources
Sink
Use the available-sinks subcommand.
$ pulsar-admin sinks available-sinks
Run a connector
To run a connector, you can perform the following operations:
create
You can create a connector using Admin CLI, REST API or JAVA admin API.
Source
Create a source connector.
- Admin CLI
- REST API
- Java Admin API
Send a POST request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/registerSource
-
Create a source connector with a local file.
void createSource(SourceConfig sourceConfig,
String fileName)
throws PulsarAdminExceptionParameter
Name Description sourceConfigThe source configuration object Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSource. -
Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSourceWithUrl(SourceConfig sourceConfig,
String pkgUrl)
throws PulsarAdminExceptionSupported URLs are
httpandfile.Example
-
File: file:///dir/fileName.jar
Parameter
Parameter Description sourceConfigThe source configuration object pkgUrlURL from which pkg can be downloaded Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSourceWithUrl.
Sink
Create a sink connector.
- Admin CLI
- REST API
- Java Admin API
Send a POST request to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sinkName/registerSink
-
Create a sink connector with a local file.
void createSink(SinkConfig sinkConfig,
String fileName)
throws PulsarAdminExceptionParameter
Name Description sinkConfigThe sink configuration object Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSink. -
Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSinkWithUrl(SinkConfig sinkConfig,
String pkgUrl)
throws PulsarAdminExceptionSupported URLs are
httpandfile.Example
-
File: file:///dir/fileName.jar
Parameter
Parameter Description sinkConfigThe sink configuration object pkgUrlURL from which pkg can be downloaded Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSinkWithUrl.
start
You can start a connector using Admin CLI or REST API.
Source
Start a source connector.
- Admin CLI
- REST API
-
Start all source connectors.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/start/startSource -
Start a specified source connector.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/start/startSource
Sink
Start a sink connector.
- Admin CLI
- REST API
-
Start all sink connectors.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sinkName/start/startSink -
Start a specified sink connector.
Send a
POSTrequest to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/start/startSink
localrun
You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.