Skip to main content

JDBC sink connector

note

You can download all the Pulsar connectors on download page.

The JDBC sink connectors allow pulling messages from Pulsar topics and persist the messages to ClickHouse, MariaDB, PostgreSQL, and SQLite.

Currently, INSERT, DELETE and UPDATE operations are supported. SQLite, MariaDB and PostgreSQL also support UPSERT operations and idempotent writes.

Configuration​

The configuration of all JDBC sink connectors has the following properties.

Property​

NameTypeRequiredDefaultDescription
userNameStringfalse" " (empty string)The username used to connect to the database specified by jdbcUrl.

Note: userName is case-sensitive.
passwordStringfalse" " (empty string)The password used to connect to the database specified by jdbcUrl.

Note: password is case-sensitive.
jdbcUrlStringtrue" " (empty string)The JDBC URL of the database that the connector connects to.
tableNameStringtrue" " (empty string)The name of the table that the connector writes to.
nonKeyStringfalse" " (empty string)A comma-separated list containing the fields used in updating events.
keyStringfalse" " (empty string)A comma-separated list containing the fields used in where condition of updating and deleting events.
timeoutMsintfalse500The JDBC operation timeout in milliseconds.
batchSizeintfalse200The batch size of updates made to the database.
insertModeenum( INSERT,UPSERT,UPDATE)falseINSERTIf it is configured as UPSERT, the sink uses upsert semantics rather than plain INSERT/UPDATE statements. Upsert semantics refer to atomically adding a new row or updating the existing row if there is a primary key constraint violation, which provides idempotence.
nullValueActionenum(FAIL, DELETE)falseFAILHow to handle records with NULL values. Possible options are DELETE or FAIL.
useTransactionsbooleanfalsetrueEnable transactions of the database.
excludeNonDeclaredFieldsbooleanfalsefalseAll the table fields are discovered automatically. excludeNonDeclaredFields indicates if the table fields not explicitly listed in nonKey and key must be included in the query. By default all the table fields are included. To leverage of table fields defaults during insertion, it is suggested to set this value to false.
useJdbcBatchbooleanfalsefalseUse the JDBC batch API. This option is suggested to improve write performance.

Example of ClickHouse​

  • JSON

    {
    "configs": {
    "userName": "clickhouse",
    "password": "password",
    "jdbcUrl": "jdbc:clickhouse://localhost:8123/pulsar_clickhouse_jdbc_sink",
    "tableName": "pulsar_clickhouse_jdbc_sink"
    "useTransactions": "false"
    }
    }
  • YAML

    tenant: "public"
    namespace: "default"
    name: "jdbc-clickhouse-sink"
    inputs: [ "persistent://public/default/jdbc-clickhouse-topic" ]
    sinkType: "jdbc-clickhouse"
    configs:
    userName: "clickhouse"
    password: "password"
    jdbcUrl: "jdbc:clickhouse://localhost:8123/pulsar_clickhouse_jdbc_sink"
    tableName: "pulsar_clickhouse_jdbc_sink"
    useTransactions: "false"

Example of MariaDB​

  • JSON

    {
    "configs": {
    "userName": "mariadb",
    "password": "password",
    "jdbcUrl": "jdbc:mariadb://localhost:3306/pulsar_mariadb_jdbc_sink",
    "tableName": "pulsar_mariadb_jdbc_sink"
    }
    }
  • YAML

    tenant: "public"
    namespace: "default"
    name: "jdbc-mariadb-sink"
    inputs: [ "persistent://public/default/jdbc-mariadb-topic" ]
    sinkType: "jdbc-mariadb"
    configs:
    userName: "mariadb"
    password: "password"
    jdbcUrl: "jdbc:mariadb://localhost:3306/pulsar_mariadb_jdbc_sink"
    tableName: "pulsar_mariadb_jdbc_sink"

Example of OpenMLDB​

OpenMLDB does not support DELETE and UPDATE operations

  • JSON
{
"configs": {
"jdbcUrl": "jdbc:openmldb:///pulsar_openmldb_db?zk=localhost:6181&zkPath=/openmldb",
"tableName": "pulsar_openmldb_jdbc_sink"
}
}
  • YAML

    tenant: "public"
    namespace: "default"
    name: "jdbc-openmldb-sink"
    inputs: [ "persistent://public/default/jdbc-openmldb-topic" ]
    sinkType: "jdbc-openmldb"
    configs:
    jdbcUrl: "jdbc:openmldb:///pulsar_openmldb_db?zk=localhost:6181&zkPath=/openmldb"
    tableName: "pulsar_openmldb_jdbc_sink"

Example of PostgreSQL​

Before using the JDBC PostgreSQL sink connector, you need to create a configuration file through one of the following methods.

  • JSON

    {
    "configs": {
    "userName": "postgres",
    "password": "password",
    "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink",
    "tableName": "pulsar_postgres_jdbc_sink"
    }
    }
  • YAML

    tenant: "public"
    namespace: "default"
    name: "jdbc-postgres-sink"
    inputs: [ "persistent://public/default/jdbc-postgres-topic" ]
    sinkType: "jdbc-postgres"
    configs:
    userName: "postgres"
    password: "password"
    jdbcUrl: "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink"
    tableName: "pulsar_postgres_jdbc_sink"

For more information on how to use this JDBC sink connector, see connect Pulsar to PostgreSQL.

Example of SQLite​

  • JSON

    {
    "configs": {
    "jdbcUrl": "jdbc:sqlite:db.sqlite",
    "tableName": "pulsar_sqlite_jdbc_sink"
    }
    }
  • YAML

    tenant: "public"
    namespace: "default"
    name: "jdbc-sqlite-sink"
    inputs: [ "persistent://public/default/jdbc-sqlite-topic" ]
    sinkType: "jdbc-sqlite"
    configs:
    jdbcUrl: "jdbc:sqlite:db.sqlite"
    tableName: "pulsar_sqlite_jdbc_sink"