Skip to main content

Elasticsearch sink connector

note

You can download all the Pulsar connectors on download page.

The Elasticsearch sink connector pulls messages from Pulsar topics and persists the messages to indexes.

Requirements

To deploy an Elasticsearch sink connector, the following are required:

  • Elasticsearch 7 (Elasticsearch 8 will be supported in the future)
  • OpenSearch 1.x

Feature

Handle data

Since Pulsar 2.9.0, the Elasticsearch sink connector has the following ways of working. You can choose one of them.

NameDescription
Raw processingThe sink reads from topics and passes the raw content to Elasticsearch.

This is the default behavior.

Raw processing was already available in Pulsar 2.8.x.
Schema awareThe sink uses the schema and handles AVRO, JSON, and KeyValue schema types while mapping the content to the Elasticsearch document.

If you set schemaEnable to true, the sink interprets the contents of the message and you can define a primary key that in turn used as the special _id field on Elasticsearch.


This allows you to perform `UPDATE`, `INSERT`, and `DELETE` operations to Elasticsearch driven by the logical primary key of the message.

This is very useful in a typical Change Data Capture scenario in which you follow the changes on your database, write them to Pulsar (using the Debezium adapter for instance), and then you write to Elasticsearch.

You configure the mapping of the primary key using the `primaryFields` configuration entry.

The `DELETE` operation can be performed when the primary key is not empty and the remaining value is empty. Use the `nullValueAction` to configure this behavior. The default configuration simply ignores such empty values.

Map multiple indexes

Since Pulsar 2.9.0, the indexName property is no more required. If you omit it, the sink writes to an index name after the Pulsar topic name.

Enable bulk writes

Since Pulsar 2.9.0, you can use bulk writes by setting the bulkEnabled property to true.

Enable secure connections via TLS

Since Pulsar 2.9.0, you can enable secure connections with TLS.

Configuration

The configuration of the Elasticsearch sink connector has the following properties.

Property

NameTypeRequiredDefaultDescription
elasticSearchUrlStringtrue" " (empty string)The URL of elastic search cluster to which the connector connects.
indexNameStringfalse" " (empty string)The index name to which the connector writes messages. The default value is the topic name. It accepts date formats in the name to support event time based index with the pattern %{+<date-format>}. For example, suppose the event time of the record is 1645182000000L, the indexName is logs-%{+yyyy-MM-dd}, then the formatted index name would be logs-2022-02-18.
schemaEnableBooleanfalsefalseTurn on the Schema Aware mode.
createIndexIfNeededBooleanfalsefalseManage index if missing.
maxRetriesIntegerfalse1The maximum number of retries for elasticsearch requests. Use -1 to disable it.
retryBackoffInMsIntegerfalse100The base time to wait when retrying an Elasticsearch request (in milliseconds).
maxRetryTimeInSecIntegerfalse86400The maximum retry time interval in seconds for retrying an elasticsearch request.
bulkEnabledBooleanfalsefalseEnable the elasticsearch bulk processor to flush write requests based on the number or size of requests, or after a given period.
bulkActionsIntegerfalse1000The maximum number of actions per elasticsearch bulk request. Use -1 to disable it.
bulkSizeInMbIntegerfalse5The maximum size in megabytes of elasticsearch bulk requests. Use -1 to disable it.
bulkConcurrentRequestsIntegerfalse0The maximum number of in flight elasticsearch bulk requests. The default 0 allows the execution of a single request. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
bulkFlushIntervalInMsLongfalse1000The maximum period of time to wait for flushing pending writes when bulk writes are enabled. -1 or zero means the scheduled flushing is disabled.
compressionEnabledBooleanfalsefalseEnable elasticsearch request compression.
connectTimeoutInMsIntegerfalse5000The elasticsearch client connection timeout in milliseconds.
connectionRequestTimeoutInMsIntegerfalse1000The time in milliseconds for getting a connection from the elasticsearch connection pool.
connectionIdleTimeoutInMsIntegerfalse5Idle connection timeout to prevent a read timeout.
keyIgnoreBooleanfalsetrueWhether to ignore the record key to build the Elasticsearch document _id. If primaryFields is defined, the connector extract the primary fields from the payload to build the document _id If no primaryFields are provided, elasticsearch auto generates a random document _id.
primaryFieldsStringfalse"id"The comma separated ordered list of field names used to build the Elasticsearch document _id from the record value. If this list is a singleton, the field is converted as a string. If this list has 2 or more fields, the generated _id is a string representation of a JSON array of the field values.
nullValueActionenum (IGNORE,DELETE,FAIL)falseIGNOREHow to handle records with null values, possible options are IGNORE, DELETE or FAIL. Default is IGNORE the message.
malformedDocActionenum (IGNORE,WARN,FAIL)falseFAILHow to handle elasticsearch rejected documents due to some malformation. Possible options are IGNORE, DELETE or FAIL. Default is FAIL the Elasticsearch document.
stripNullsBooleanfalsetrueIf stripNulls is false, elasticsearch _source includes 'null' for empty fields (for example {"foo": null}), otherwise null fields are stripped.
socketTimeoutInMsIntegerfalse60000The socket timeout in milliseconds waiting to read the elasticsearch response.
typeNameStringfalse"_doc"The type name to which the connector writes messages to.

The value should be set explicitly to a valid type name other than "_doc" for Elasticsearch version before 6.2, and left to default otherwise.
indexNumberOfShardsintfalse1The number of shards of the index.
indexNumberOfReplicasintfalse1The number of replicas of the index.
usernameStringfalse" " (empty string)The username used by the connector to connect to the elastic search cluster.

If username is set, then password should also be provided.
passwordStringfalse" " (empty string)The password used by the connector to connect to the elastic search cluster.

If username is set, then password should also be provided.
sslElasticSearchSslConfigfalseConfiguration for TLS encrypted communication
compatibilityModeenum (AUTO,ELASTICSEARCH,ELASTICSEARCH_7,OPENSEARCH)falseAUTOSpecify compatibility mode with the ElasticSearch cluster. AUTO value will try to auto detect the correct compatibility mode to use. Use ELASTICSEARCH_7 if the target cluster is running ElasticSearch 7 or prior. Use ELASTICSEARCH if the target cluster is running ElasticSearch 8 or higher. Use OPENSEARCH if the target cluster is running OpenSearch.
tokenStringfalse" " (empty string)The token used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured.
apiKeyStringfalse" " (empty string)The apiKey used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured.
canonicalKeyFieldsBooleanfalsefalseWhether to sort the key fields for JSON and Avro or not. If it is set to true and the record key schema is JSON or AVRO, the serialized object does not consider the order of properties.
stripNonPrintableCharactersBooleanfalsetrueWhether to remove all non-printable characters from the document or not. If it is set to true, all non-printable characters are removed from the document.
idHashingAlgorithmenum(NONE,SHA256,SHA512)falseNONEHashing algorithm to use for the document id. This is useful in order to be compliant with the ElasticSearch _id hard limit of 512 bytes.
conditionalIdHashingBooleanfalsefalseThis option only works if idHashingAlgorithm is set. If enabled, the hashing is performed only if the id is greater than 512 bytes otherwise the hashing is performed on each document in any case.
copyKeyFieldsBooleanfalsefalseIf the message key schema is AVRO or JSON, the message key fields are copied into the ElasticSearch document.

Definition of ElasticSearchSslConfig structure:

NameTypeRequiredDefaultDescription
enabledBooleanfalsefalseEnable SSL/TLS.
hostnameVerificationBooleanfalsetrueWhether or not to validate node hostnames when using SSL.
disableCertificateValidationBooleanfalsetrueWhether or not to disable the node certificate validation. Changing this value is high insecure and you should not use it in production environment.
truststorePathStringfalse" " (empty string)The path to the truststore file.
truststorePasswordStringfalse" " (empty string)Truststore password.
keystorePathStringfalse" " (empty string)The path to the keystore file.
keystorePasswordStringfalse" " (empty string)Keystore password.
cipherSuitesStringfalse" " (empty string)SSL/TLS cipher suites.
protocolsStringfalse"TLSv1.2"Comma separated list of enabled SSL/TLS protocols.

Example

Before using the Elasticsearch sink connector, you need to create a configuration file through one of the following methods.

Configuration

For Elasticsearch After 6.2

  • JSON

    {
    "configs": {
    "elasticSearchUrl": "http://localhost:9200",
    "indexName": "my_index",
    "username": "scooby",
    "password": "doobie"
    }
    }
  • YAML

    configs:
    elasticSearchUrl: "http://localhost:9200"
    indexName: "my_index"
    username: "scooby"
    password: "doobie"

For Elasticsearch Before 6.2

  • JSON

    {
    "elasticSearchUrl": "http://localhost:9200",
    "indexName": "my_index",
    "typeName": "doc",
    "username": "scooby",
    "password": "doobie"
    }
  • YAML

    configs:
    elasticSearchUrl: "http://localhost:9200"
    indexName: "my_index"
    typeName: "doc"
    username: "scooby"
    password: "doobie"

Usage

  1. Start a single node Elasticsearch cluster.

    docker run -p 9200:9200 -p 9300:9300 \
    -e "discovery.type=single-node" \
    docker.elastic.co/elasticsearch/elasticsearch:7.13.3
  2. Start a Pulsar service locally in standalone mode.

    bin/pulsar standalone

    Make sure the NAR file is available at connectors/pulsar-io-elastic-search-3.0.3.nar.

  3. Start the Pulsar Elasticsearch connector in local run mode using one of the following methods.

    • Use the JSON configuration as shown previously.

      bin/pulsar-admin sinks localrun \
      --archive connectors/pulsar-io-elastic-search-3.0.3.nar \
      --tenant public \
      --namespace default \
      --name elasticsearch-test-sink \
      --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "scooby","password": "doobie"}' \
      --inputs elasticsearch_test
    • Use the YAML configuration file as shown previously.

      bin/pulsar-admin sinks localrun \
      --archive connectors/pulsar-io-elastic-search-3.0.3.nar \
      --tenant public \
      --namespace default \
      --name elasticsearch-test-sink \
      --sink-config-file elasticsearch-sink.yml \
      --inputs elasticsearch_test
  4. Publish records to the topic.

    bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}"
  5. Check documents in Elasticsearch.

    • refresh the index

      curl -s http://localhost:9200/my_index/_refresh
    • search documents

      curl -s http://localhost:9200/my_index/_search

      You can see the record that published earlier has been successfully written into Elasticsearch.

      {"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}}