How to use Pulsar connectors
This guide describes how to use Pulsar connectors.
Install a connector
Pulsar bundles several built-in connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-built-in connectors.
When using a non-built-in connector, you need to specify the path of an archive file for the connector.
To set up a built-in connector, follow the instructions.
After the setup, the built-in connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.
Configure a connector
You can configure the following information:
Configure a default storage location for a built-in connector
To configure a default folder for built-in connectors, set the connectorsDirectory parameter in the ./conf/functions_worker.yml configuration file.
Example
Set the ./connectors folder as the default storage location for built-in connectors.
########################
# Connectors
########################
connectorsDirectory: ./connectors
Configure a connector with a YAML file
To configure a connector, you need to provide a YAML configuration file when creating a connector.
The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.
Example 1
Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:
- 
Which Cassandra cluster to connect 
- 
What is the keyspaceandcolumnFamilyto be used in Cassandra for collecting data
- 
How to map Pulsar messages into Cassandra table key and columns 
tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
    roots: "localhost:9042"
    keyspace: "pulsar_test_keyspace"
    columnFamily: "pulsar_test_table"
    keyname: "key"
    columnName: "col"
Example 2
Below is a YAML configuration file of a Kafka source.
configs:
   bootstrapServers: "pulsar-kafka:9092"
   groupId: "test-pulsar-io"
   topic: "my-topic"
   sessionTimeoutMs: "10000"
   autoCommitEnabled: "false"
Example 3
Below is a YAML configuration file of a PostgreSQL JDBC sink.
configs:
   userName: "postgres"
   password: "password"
   jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
   tableName: "test_jdbc"
Prepare a connector
Before starting using connectors, you can perform the following operations:
reload
If you add or delete a nar file in a connector folder, reload the available built-in connector before using it.
Source
To reload source connectors, you can use the reload subcommand.
pulsar-admin sources reload
For the latest and complete information, see Pulsar admin docs.
Sink
To reload sink connectors, you can use the reload subcommand.
pulsar-admin sinks reload
For the latest and complete information, see Pulsar admin docs.
available
After reloading connectors (optional), you can get a list of available connectors.
Source
To get a list of source connectors, you can use the available-sources subcommand.
pulsar-admin sources available-sources
Sink
To get a list of sink connectors, you can use the available-sinks subcommand.
pulsar-admin sinks available-sinks
Run a connector
To run a connector, you can perform the following operations:
create
To create a connector, you can use Admin CLI, REST API or JAVA admin API.
Source
To create a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the create subcommand.
pulsar-admin sources create options
For the latest and complete information, see Pulsar admin docs.
Send a POST request to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}
- 
Create a source connector with a local file. void createSource(SourceConfig sourceConfig,
 String fileName)
 throws PulsarAdminExceptionParameter Name Description sourceConfigThe source configuration object Exception Name Description PulsarAdminExceptionUnexpected error For more information, see createSource.
- 
Create a source connector using a remote file with a URL from which fun-pkg can be downloaded. void createSourceWithUrl(SourceConfig sourceConfig,
 String pkgUrl)
 throws PulsarAdminExceptionSupported URLs are httpandfile.Example 
- 
File: file:///dir/fileName.jar 
 Parameter Parameter Description sourceConfigThe source configuration object pkgUrlURL from which pkg can be downloaded Exception Name Description PulsarAdminExceptionUnexpected error For more information, see createSourceWithUrl.
Sink
To create a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the create subcommand.
pulsar-admin sinks create options
For the latest and complete information, see Pulsar admin docs.
Send a POST request to this endpoint: POST /admin/v3/sinks/{tenant}/{namespace}/{sinkName}
- 
Create a sink connector with a local file. void createSink(SinkConfig sinkConfig,
 String fileName)
 throws PulsarAdminExceptionParameter Name Description sinkConfigThe sink configuration object Exception Name Description PulsarAdminExceptionUnexpected error For more information, see createSink.
- 
Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded. void createSinkWithUrl(SinkConfig sinkConfig,
 String pkgUrl)
 throws PulsarAdminExceptionSupported URLs are httpandfile.Example 
- 
File: file:///dir/fileName.jar 
 Parameter Parameter Description sinkConfigThe sink configuration object pkgUrlURL from which pkg can be downloaded Exception Name Description PulsarAdminExceptionUnexpected error For more information, see createSinkWithUrl.
start
To start a connector, you can use Admin CLI or REST API.
Source
To start a source connector, you can use the following commands.
- Admin CLI
- REST API
Use the start subcommand.
pulsar-admin sources start options
For the latest and complete information, see Pulsar admin docs.
- 
Start all source connectors. Send a POSTrequest to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}/start
- 
Start a specified source connector. Send a POSTrequest to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}/{instanceId}/start
Sink
To start a sink connector, you can use the following commands:
- Admin CLI
- REST API
Use the start subcommand.
pulsar-admin sinks start options
For the latest and complete information, see Pulsar admin docs.
- 
Start all sink connectors. Send a POSTrequest to this endpoint: POST /admin/v3/sinks/{tenant}/{namespace}/{sinkName}/start
- 
Start a specified sink connector. Send a POSTrequest to this endpoint: POST /admin/v3/sinks/{tenant}/{namespace}/{sinkName}/{instanceId}/start
localrun
To run a connector locally rather than deploying it on a Pulsar cluster, you can use Admin CLI
Source
To run a source connector locally, you can use the following command:
- Admin CLI
Use the localrun subcommand.
pulsar-admin sources localrun options
For the latest and complete information, see Pulsar admin docs.
Sink
To run a sink connector locally, you can use the following command:
- Admin CLI
Use the localrun subcommand.
pulsar-admin sinks localrun options
For the latest and complete information, see Pulsar admin docs.
Run a Pulsar Function before a sink connector
You can run a Pulsar Function in memory before a sink connector. For details, see PIP 193: Sink preprocessing Function.
Running a Pulsar Function in memory before a sink connector provides lower latency, less I/O, and disk consumption than going through an intermediate topic.
Use the --transform-function, --transform-function-classname and --transform-function-config options when creating the sink connector to configure the transform Function to run.
For the latest and complete information, see Pulsar admin sinks command docs.
Monitor a connector
To monitor a connector, you can perform the following operations:
get
To get the information of a connector, You can use Admin CLI, REST API or JAVA admin API.
Source
To get the information of a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the get subcommand.
pulsar-admin sources get options
For the latest and complete information, see Pulsar admin docs.
Send a GET request to this endpoint: GET /admin/v3/sources/{tenant}/{namespace}/{sourceName}
SourceConfig getSource(String tenant,
                       String namespace,
                       String source)
                throws PulsarAdminException
Example
This is a sourceConfig.
{
 "tenant": "tenantName",
 "namespace": "namespaceName",
 "name": "sourceName",
 "className": "className",
 "topicName": "topicName",
 "configs": {},
 "parallelism": 1,
 "processingGuarantees": "ATLEAST_ONCE",
 "resources": {
   "cpu": 1.0,
   "ram": 1073741824,
   "disk": 10737418240
 }
}
This is a sourceConfig example.
{
 "tenant": "public",
 "namespace": "default",
 "name": "debezium-mysql-source",
 "className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource",
 "topicName": "debezium-mysql-topic",
 "configs": {
   "database.user": "debezium",
   "database.server.id": "184054",
   "database.server.name": "dbserver1",
   "database.port": "3306",
   "database.hostname": "localhost",
   "database.password": "dbz",
   "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
   "value.converter": "org.apache.kafka.connect.json.JsonConverter",
   "database.whitelist": "inventory",
   "key.converter": "org.apache.kafka.connect.json.JsonConverter",
   "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
   "pulsar.service.url": "pulsar://127.0.0.1:6650",
   "database.history.pulsar.topic": "history-topic2"
 },
 "parallelism": 1,
 "processingGuarantees": "ATLEAST_ONCE",
 "resources": {
   "cpu": 1.0,
   "ram": 1073741824,
   "disk": 10737418240
 }
}
Exception
| Exception name | Description | 
|---|---|
| PulsarAdminException.NotAuthorizedException | You don't have the admin permission | 
| PulsarAdminException.NotFoundException | Cluster doesn't exist | 
| PulsarAdminException | Unexpected error | 
For more information, see getSource.
Sink
To get the information of a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the get subcommand.
pulsar-admin sinks get options
For the latest and complete information, see Pulsar admin docs.
Send a GET request to this endpoint: GET /admin/v3/sinks/{tenant}/{namespace}/{sinkName}
SinkConfig getSink(String tenant,
                   String namespace,
                   String sink)
            throws PulsarAdminException
Example
This is a sinkConfig.
{
"tenant": "tenantName",
"namespace": "namespaceName",
"name": "sinkName",
"className": "className",
"inputSpecs": {
"topicName": {
    "isRegexPattern": false
}
},
"configs": {},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
}
This is a sinkConfig example.
{
  "tenant": "public",
  "namespace": "default",
  "name": "pulsar-postgres-jdbc-sink",
  "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink",
  "inputSpecs": {
  "pulsar-postgres-jdbc-sink-topic": {
     "isRegexPattern": false
    }
  },
  "configs": {
    "password": "password",
    "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink",
    "userName": "postgres",
    "tableName": "pulsar_postgres_jdbc_sink"
  },
  "parallelism": 1,
  "processingGuarantees": "ATLEAST_ONCE",
  "retainOrdering": false,
  "autoAck": true
}
Parameter description
| Name | Description | 
|---|---|
| tenant | Tenant name | 
| namespace | Namespace name | 
| sink | Sink name | 
For more information, see getSink.
list
To get the list of all running connectors, You can use Admin CLI, REST API or JAVA admin API.
Source
To get the list of all running source connectors, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the list subcommand.
pulsar-admin sources list options
For the latest and complete information, see Pulsar admin docs.
Send a GET request to this endpoint: GET /admin/v3/sources/{tenant}/{namespace}
List<String> listSources(String tenant,
                         String namespace)
                  throws PulsarAdminException
Response example
["f1", "f2", "f3"]
Exception
| Exception name | Description | 
|---|---|
| PulsarAdminException.NotAuthorizedException | You don't have the admin permission | 
| PulsarAdminException | Unexpected error | 
For more information, see listSource.
Sink
To get the list of all running sink connectors, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the list subcommand.
pulsar-admin sinks list options
For the latest and complete information, see Pulsar admin docs.
Send a GET request to this endpoint: GET /admin/v3/sinks/{tenant}/{namespace}
List<String> listSinks(String tenant,
                       String namespace)
                throws PulsarAdminException
Response example
["f1", "f2", "f3"]
Exception
| Exception name | Description | 
|---|---|
| PulsarAdminException.NotAuthorizedException | You don't have the admin permission | 
| PulsarAdminException | Unexpected error | 
For more information, see listSource.
status
To get the current status of a connector, you can use Admin CLI, REST API or JAVA admin API.
Source
To get the current status of a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the status subcommand.
pulsar-admin sources status options
For the latest and complete information, see Pulsar admin docs.
- 
Get the current status of all source connectors. Send a GETrequest to this endpoint: GET /admin/v3/sources/{tenant}/{namespace}/{sourceName}/status
- 
Gets the current status of a specified source connector. Send a GETrequest to this endpoint: GET /admin/v3/sources/{tenant}/{namespace}/{sourceName}/status
- 
Get the current status of all source connectors. SourceStatus getSourceStatus(String tenant,
 String namespace,
 String source)
 throws PulsarAdminExceptionParameter Parameter Description tenantTenant name namespaceNamespace name sinkSource name Exception Name Description PulsarAdminExceptionUnexpected error For more information, see getSourceStatus.
- 
Gets the current status of a specified source connector. SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant,
 String namespace,
 String source,
 int id)
 throws PulsarAdminExceptionParameter Parameter Description tenantTenant name namespaceNamespace name sinkSource name idSource instanceID Exception Exception name Description PulsarAdminExceptionUnexpected error For more information, see getSourceStatus.
Sink
To get the current status of a Pulsar sink connector, you can use the following commads:
- Admin CLI
- REST API
- Java Admin API
Use the status subcommand.
pulsar-admin sinks status options
For the latest and complete information, see Pulsar admin docs.
- 
Get the current status of all sink connectors. Send a GETrequest to this endpoint: GET /admin/v3/sinks/{tenant}/{namespace}/{sinkName}/status
- 
Gets the current status of a specified sink connector. Send a GETrequest to this endpoint: GET /admin/v3/sinks/{tenant}/{namespace}/{sinkName}/{instanceId}/status
- 
Get the current status of all sink connectors. SinkStatus getSinkStatus(String tenant,
 String namespace,
 String sink)
 throws PulsarAdminExceptionParameter Parameter Description tenantTenant name namespaceNamespace name sinkSource name Exception Exception name Description PulsarAdminExceptionUnexpected error For more information, see getSinkStatus.
- 
Gets the current status of a specified source connector. SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant,
 String namespace,
 String sink,
 int id)
 throws PulsarAdminExceptionParameter Parameter Description tenantTenant name namespaceNamespace name sinkSource name idSink instanceID Exception Exception name Description PulsarAdminExceptionUnexpected error For more information, see getSinkStatusWithInstanceID.
Update a connector
update
To update a running connector, you can use Admin CLI, REST API or JAVA admin API.
Source
To update a running Pulsar source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the update subcommand.
pulsar-admin sources update options
For the latest and complete information, see Pulsar admin docs.
Send a PUT request to this endpoint: PUT /admin/v3/sources/{tenant}/{namespace}/{sourceName}
- 
Update a running source connector with a local file. void updateSource(SourceConfig sourceConfig,
 String fileName)
 throws PulsarAdminExceptionParameter Name Description sourceConfigThe source configuration object Exception Name Description PulsarAdminException.NotAuthorizedExceptionYou don't have the admin permission PulsarAdminException.NotFoundExceptionCluster doesn't exist PulsarAdminExceptionUnexpected error For more information, see updateSource.
- 
Update a source connector using a remote file with a URL from which fun-pkg can be downloaded. void updateSourceWithUrl(SourceConfig sourceConfig,
 String pkgUrl)
 throws PulsarAdminExceptionSupported URLs are httpandfile.Example 
- 
File: file:///dir/fileName.jar 
 Parameter Name Description sourceConfigThe source configuration object pkgUrlURL from which pkg can be downloaded Exception Name Description PulsarAdminException.NotAuthorizedExceptionYou don't have the admin permission PulsarAdminException.NotFoundExceptionCluster doesn't exist PulsarAdminExceptionUnexpected error 
For more information, see createSourceWithUrl.
Sink
To update a running Pulsar sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the update subcommand.
pulsar-admin sinks update options
For the latest and complete information, see Pulsar admin docs.
Send a PUT request to this endpoint: PUT /admin/v3/sinks/{tenant}/{namespace}/{sinkName}
- 
Update a running sink connector with a local file. void updateSink(SinkConfig sinkConfig,
 String fileName)
 throws PulsarAdminExceptionParameter Name Description sinkConfigThe sink configuration object Exception Name Description PulsarAdminException.NotAuthorizedExceptionYou don't have the admin permission PulsarAdminException.NotFoundExceptionCluster doesn't exist PulsarAdminExceptionUnexpected error For more information, see updateSink.
- 
Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded. void updateSinkWithUrl(SinkConfig sinkConfig,
 String pkgUrl)
 throws PulsarAdminExceptionSupported URLs are httpandfile.Example 
- 
File: file:///dir/fileName.jar 
 Parameter Name Description sinkConfigThe sink configuration object pkgUrlURL from which pkg can be downloaded Exception Name Description PulsarAdminException.NotAuthorizedExceptionYou don't have the admin permission PulsarAdminException.NotFoundExceptionCluster doesn't exist PulsarAdminExceptionUnexpected error 
For more information, see updateSinkWithUrl.
Stop a connector
stop
To stop a connector, you can use Admin CLI, REST API or JAVA admin API.
Source
To stop a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the stop subcommand.
pulsar-admin sources stop options
For the latest and complete information, see Pulsar admin docs.
- 
Stop all source connectors. Send a POSTrequest to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}/stop
- 
Stop a specified source connector. Send a POSTrequest to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}/{instanceId}/stop
- 
Stop all source connectors. void stopSource(String tenant,
 String namespace,
 String source)
 throws PulsarAdminExceptionParameter Name Description tenantTenant name namespaceNamespace name sourceSource name Exception Name Description PulsarAdminExceptionUnexpected error For more information, see stopSource.
- 
Stop a specified source connector. void stopSource(String tenant,
 String namespace,
 String source,
 int instanceId)
 throws PulsarAdminExceptionParameter Name Description tenantTenant name namespaceNamespace name sourceSource name instanceIdSource instanceID Exception Name Description PulsarAdminExceptionUnexpected error For more information, see stopSource.
Sink
To stop a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the stop subcommand.
pulsar-admin sinks stop options
For the latest and complete information, see Pulsar admin docs.
- 
Stop all sink connectors. Send a POSTrequest to this endpoint: POST /admin/v3/sinks/{tenant}/{namespace}/{sinkName}/stop
- 
Stop a specified sink connector. Send a POSTrequest to this endpoint: POST /admin/v3/sinks/{tenant}/{namespace}/{sinkName}/{instanceId}/stop
- 
Stop all sink connectors. void stopSink(String tenant,
 String namespace,
 String sink)
 throws PulsarAdminExceptionParameter Name Description tenantTenant name namespaceNamespace name sourceSource name Exception Name Description PulsarAdminExceptionUnexpected error For more information, see stopSink.
- 
Stop a specified sink connector. void stopSink(String tenant,
 String namespace,
 String sink,
 int instanceId)
 throws PulsarAdminExceptionParameter Name Description tenantTenant name namespaceNamespace name sourceSource name instanceIdSource instanceID Exception Name Description PulsarAdminExceptionUnexpected error For more information, see stopSink.
Restart a connector
restart
To restart a connector, you can use Admin CLI, REST API or JAVA admin API.
Source
To restart a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the restart subcommand.
pulsar-admin sources restart options
For the latest and complete information, see Pulsar admin docs.
- 
Restart all source connectors. Send a POSTrequest to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}/restart
- 
Restart a specified source connector. Send a POSTrequest to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}/{instanceId}/restart
- 
Restart all source connectors. void restartSource(String tenant,
 String namespace,
 String source)
 throws PulsarAdminExceptionParameter Name Description tenantTenant name namespaceNamespace name sourceSource name Exception Name Description PulsarAdminExceptionUnexpected error For more information, see restartSource.
- 
Restart a specified source connector. void restartSource(String tenant,
 String namespace,
 String source,
 int instanceId)
 throws PulsarAdminExceptionParameter Name Description tenantTenant name namespaceNamespace name sourceSource name instanceIdSource instanceID Exception Name Description PulsarAdminExceptionUnexpected error For more information, see restartSource.
Sink
To restart a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the restart subcommand.
pulsar-admin sinks restart options
For the latest and complete information, see Pulsar admin docs.
- 
Restart all sink connectors. Send a POSTrequest to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}/restart
- 
Restart a specified sink connector. Send a POSTrequest to this endpoint: POST /admin/v3/sources/{tenant}/{namespace}/{sourceName}/{instanceId}/restart
- 
Restart all Pulsar sink connectors. void restartSink(String tenant,
 String namespace,
 String sink)
 throws PulsarAdminExceptionParameter Name Description tenantTenant name namespaceNamespace name sinkSink name Exception Name Description PulsarAdminExceptionUnexpected error For more information, see restartSink.
- 
Restart a specified sink connector. void restartSink(String tenant,
 String namespace,
 String sink,
 int instanceId)
 throws PulsarAdminExceptionParameter Name Description tenantTenant name namespaceNamespace name sourceSource name instanceIdSink instanceID Exception Name Description PulsarAdminExceptionUnexpected error For more information, see restartSink.
Delete a connector
delete
To delete a connector, you can use Admin CLI, REST API or JAVA admin API.
Source
To delete a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the delete subcommand.
pulsar-admin sources delete options
For the latest and complete information, see Pulsar admin docs.
Delete al Pulsar source connector.
Send a DELETE request to this endpoint: DELETE /admin/v3/sources/{tenant}/{namespace}/{sourceName}
Delete a source connector.
void deleteSource(String tenant,
                  String namespace,
                  String source)
           throws PulsarAdminException
Parameter
| Name | Description | 
|---|---|
| tenant | Tenant name | 
| namespace | Namespace name | 
| source | Source name | 
Exception
| Name | Description | 
|---|---|
| PulsarAdminException.NotAuthorizedException | You don't have the admin permission | 
| PulsarAdminException.NotFoundException | Cluster doesn't exist | 
| PulsarAdminException.PreconditionFailedException | Cluster is not empty | 
| PulsarAdminException | Unexpected error | 
For more information, see deleteSource.
Sink
To delete a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the delete subcommand.
pulsar-admin sinks delete options
For the latest and complete information, see Pulsar admin docs.
Delete a sink connector.
Send a DELETE request to this endpoint: DELETE /admin/v3/sinks/{tenant}/{namespace}/{sinkName}
Delete a Pulsar sink connector.
void deleteSink(String tenant,
                String namespace,
                String source)
         throws PulsarAdminException
Parameter
| Name | Description | 
|---|---|
| tenant | Tenant name | 
| namespace | Namespace name | 
| sink | Sink name | 
Exception
| Name | Description | 
|---|---|
| PulsarAdminException.NotAuthorizedException | You don't have the admin permission | 
| PulsarAdminException.NotFoundException | Cluster doesn't exist | 
| PulsarAdminException.PreconditionFailedException | Cluster is not empty | 
| PulsarAdminException | Unexpected error | 
For more information, see deleteSource.