How to use Pulsar connectors
This guide describes how to use Pulsar connectors.
Install a connector
Pulsar bundles several built-in connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-built-in connectors.
When using a non-built-in connector, you need to specify the path of an archive file for the connector.
To set up a built-in connector, follow the instructions.
After the setup, the built-in connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.
Configure a connector
You can configure the following information:
Configure a default storage location for a built-in connector
To configure a default folder for built-in connectors, set the connectorsDirectory parameter in the ./conf/functions_worker.yml configuration file.
Example
Set the ./connectors folder as the default storage location for built-in connectors.
########################
# Connectors
########################
connectorsDirectory: ./connectors
Configure a connector with a YAML file
To configure a connector, you need to provide a YAML configuration file when creating a connector.
The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.
Example 1
Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:
-
Which Cassandra cluster to connect
-
What is the
keyspaceandcolumnFamilyto be used in Cassandra for collecting data -
How to map Pulsar messages into Cassandra table key and columns
tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"
Example 2
Below is a YAML configuration file of a Kafka source.
configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"
Example 3
Below is a YAML configuration file of a PostgreSQL JDBC sink.
configs:
userName: "postgres"
password: "password"
jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
tableName: "test_jdbc"
Prepare a connector
Before starting using connectors, you can perform the following operations:
reload
If you add or delete a nar file in a connector folder, reload the available built-in connector before using it.
Source
To reload source connectors, you can use the reload subcommand.
pulsar-admin sources reload
For the latest and complete information, see Pulsar admin docs.
Sink
To reload sink connectors, you can use the reload subcommand.
pulsar-admin sinks reload
For the latest and complete information, see Pulsar admin docs.
available
After reloading connectors (optional), you can get a list of available connectors.
Source
To get a list of source connectors, you can use the available-sources subcommand.
pulsar-admin sources available-sources
Sink
To get a list of sink connectors, you can use the available-sinks subcommand.
pulsar-admin sinks available-sinks
Run a connector
To run a connector, you can perform the following operations:
create
To create a connector, you can use Admin CLI, REST API or JAVA admin API.
Source
To create a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the create subcommand.
pulsar-admin sources create options
For the latest and complete information, see Pulsar admin docs.
Send a POST request to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}
-
Create a source connector with a local file.
void createSource(SourceConfig sourceConfig,
String fileName)
throws PulsarAdminExceptionParameter
Name Description sourceConfigThe source configuration object Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSource. -
Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSourceWithUrl(SourceConfig sourceConfig,
String pkgUrl)
throws PulsarAdminExceptionSupported URLs are
httpandfile.Example
-
File: file:///dir/fileName.jar
Parameter
Parameter Description sourceConfigThe source configuration object pkgUrlURL from which pkg can be downloaded Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSourceWithUrl.
Sink
To create a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the create subcommand.
pulsar-admin sinks create options
For the latest and complete information, see Pulsar admin docs.
Send a POST request to this endpoint: POST /admin/v3/sinks/{tenant}/{namespace}/{sinkName}
-
Create a sink connector with a local file.
void createSink(SinkConfig sinkConfig,
String fileName)
throws PulsarAdminExceptionParameter
Name Description sinkConfigThe sink configuration object Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSink. -
Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSinkWithUrl(SinkConfig sinkConfig,
String pkgUrl)
throws PulsarAdminExceptionSupported URLs are
httpandfile.Example
-
File: file:///dir/fileName.jar
Parameter
Parameter Description sinkConfigThe sink configuration object pkgUrlURL from which pkg can be downloaded Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSinkWithUrl.
start
To start a connector, you can use Admin CLI or REST API.
Source
To start a source connector, you can use the following commands.
- Admin CLI
- REST API
Use the start subcommand.
pulsar-admin sources start options
For the latest and complete information, see Pulsar admin docs.
-
Start all source connectors.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}/start -
Start a specified source connector.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}/{instanceId}/start
Sink
To start a sink connector, you can use the following commands:
- Admin CLI
- REST API
Use the start subcommand.
pulsar-admin sinks start options
For the latest and complete information, see Pulsar admin docs.
-
Start all sink connectors.
Send a
POSTrequest to this endpoint: POST /admin/v3/sinks/{tenant}/{namespace}/{sinkName}/start -
Start a specified sink connector.
Send a
POSTrequest to this endpoint: POST /admin/v3/sinks/{tenant}/{namespace}/{sinkName}/{instanceId}/start
localrun
To run a connector locally rather than deploying it on a Pulsar cluster, you can use Admin CLI
Source
To run a source connector locally, you can use the following command:
- Admin CLI
Use the localrun subcommand.
pulsar-admin sources localrun options
For the latest and complete information, see Pulsar admin docs.
Sink
To run a sink connector locally, you can use the following command:
- Admin CLI
Use the localrun subcommand.
pulsar-admin sinks localrun options
For the latest and complete information, see Pulsar admin docs.
Run a Pulsar Function before a sink connector
You can run a Pulsar Function in memory before a sink connector. For details, see PIP 193: Sink preprocessing Function.
Running a Pulsar Function in memory before a sink connector provides lower latency, less I/O, and disk consumption than going through an intermediate topic.
Use the --transform-function, --transform-function-classname and --transform-function-config options when creating the sink connector to configure the transform Function to run.
For the latest and complete information, see Pulsar admin sinks command docs.
Monitor a connector
To monitor a connector, you can perform the following operations:
get
To get the information of a connector, You can use Admin CLI, REST API or JAVA admin API.