Skip to main content

ElasticSearch sink connector

The ElasticSearch sink connector pulls messages from Pulsar topics and persists the messages to indexes.

Configuration​

The configuration of the ElasticSearch sink connector has the following properties.

Property​

NameTypeRequiredDefaultDescription
elasticSearchUrlStringtrue" " (empty string)The URL of elastic search cluster to which the connector connects.
indexNameStringtrue" " (empty string)The index name to which the connector writes messages.
typeNameStringfalse"_doc"The type name to which the connector writes messages to.

The value should be set explicitly to a valid type name other than "_doc" for Elasticsearch version before 6.2, and left to default otherwise.
indexNumberOfShardsintfalse1The number of shards of the index.
indexNumberOfReplicasintfalse1The number of replicas of the index.
usernameStringfalse" " (empty string)The username used by the connector to connect to the elastic search cluster.

If username is set, then password should also be provided.
passwordStringfalse" " (empty string)The password used by the connector to connect to the elastic search cluster.

If username is set, then password should also be provided.

Example​

Before using the ElasticSearch sink connector, you need to create a configuration file through one of the following methods.

Configuration​

For Elasticsearch After 6.2​

  • JSON


    {
    "elasticSearchUrl": "http://localhost:9200",
    "indexName": "my_index",
    "username": "scooby",
    "password": "doobie"
    }

  • YAML


    configs:
    elasticSearchUrl: "http://localhost:9200"
    indexName: "my_index"
    username: "scooby"
    password: "doobie"

For Elasticsearch Before 6.2​

  • JSON


    {
    "elasticSearchUrl": "http://localhost:9200",
    "indexName": "my_index",
    "typeName": "doc",
    "username": "scooby",
    "password": "doobie"
    }

  • YAML


    configs:
    elasticSearchUrl: "http://localhost:9200"
    indexName: "my_index"
    typeName: "doc"
    username: "scooby"
    password: "doobie"

Usage​

  1. Start a single node Elasticsearch cluster.


    $ docker run -p 9200:9200 -p 9300:9300 \
    -e "discovery.type=single-node" \
    docker.elastic.co/elasticsearch/elasticsearch:7.5.1

  2. Start a Pulsar service locally in standalone mode.


    $ bin/pulsar standalone

    Make sure the NAR file is available at connectors/pulsar-io-elastic-search-2.6.0.nar.

  3. Start the Pulsar Elasticsearch connector in local run mode using one of the following methods.

    • Use the JSON configuration as shown previously.


      $ bin/pulsar-admin sinks localrun \
      --archive connectors/pulsar-io-elastic-search-2.6.0.nar \
      --tenant public \
      --namespace default \
      --name elasticsearch-test-sink \
      --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "scooby","password": "doobie"}' \
      --inputs elasticsearch_test

    • Use the YAML configuration file as shown previously.


      $ bin/pulsar-admin sinks localrun \
      --archive connectors/pulsar-io-elastic-search-2.6.0.nar \
      --tenant public \
      --namespace default \
      --name elasticsearch-test-sink \
      --sink-config-file elasticsearch-sink.yml \
      --inputs elasticsearch_test

  4. Publish records to the topic.


    $ bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}"

  5. Check documents in Elasticsearch.

    • refresh the index


      $ curl -s http://localhost:9200/my_index/_refresh

  • search documents


    $ curl -s http://localhost:9200/my_index/_search

    You can see the record that published earlier has been successfully written into Elasticsearch.


    {"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}}