Skip to main content

Kinesis sink connector

note

You can download all the Pulsar connectors on download page.

The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.

Configuration​

The configuration of the Kinesis sink connector has the following property.

Property​

NameTypeRequiredDefaultDescription
messageFormatMessageFormattrueONLY_RAW_PAYLOADMessage format in which Kinesis sink converts Pulsar messages and publishes to Kinesis streams.

Below are the available options:

  • ONLY_RAW_PAYLOAD: Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream.

  • FULL_MESSAGE_IN_JSON: Kinesis sink creates a JSON payload with Pulsar message payload, properties and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.

  • FULL_MESSAGE_IN_FB: Kinesis sink creates a flatbuffer serialized payload with Pulsar message payload, properties and encryptionCtx, and publishes flatbuffer payload into the configured Kinesis stream.

  • FULL_MESSAGE_IN_JSON_EXPAND_VALUE: Kinesis sink sends a JSON structure containing the record topic name, key, payload, properties and event time. The record schema is used to convert the value to JSON.
  • jsonIncludeNonNullsbooleanfalsetrueOnly the properties with non-null values are included when the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE.
    jsonFlattenbooleanfalsefalseWhen it is set to true and the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE, the output JSON is flattened.
    retainOrderingbooleanfalsefalseWhether Pulsar connectors to retain ordering when moving messages from Pulsar to Kinesis or not.
    awsEndpointStringfalse" " (empty string)The Kinesis end-point URL, which can be found at here.
    awsRegionStringfalse" " (empty string)The AWS region.

    Example
    us-west-1, us-west-2
    awsKinesisStreamNameStringtrue" " (empty string)The Kinesis stream name.
    awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of AwsCredentialProviderPlugin.

    It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink.

    If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam.
    awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin.

    Built-in plugins​

    The following are built-in AwsCredentialProviderPlugin plugins:

    • org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin

      This plugin takes no configuration, it uses the default AWS provider chain.

      For more information, see AWS documentation.

    • org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin

      This plugin takes a configuration (via the awsCredentialPluginParam) that describes a role to assume when running the KCL.

      This configuration takes the form of a small JSON document like:

      {"roleArn": "arn...", "roleSessionName": "name"}

    Example​

    Before using the Kinesis sink connector, you need to create a configuration file through one of the following methods.

    • JSON

      {
      "configs": {
      "awsEndpoint": "some.endpoint.aws",
      "awsRegion": "us-east-1",
      "awsKinesisStreamName": "my-stream",
      "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
      "messageFormat": "ONLY_RAW_PAYLOAD",
      "retainOrdering": "true"
      }
      }
    • YAML

      configs:
      awsEndpoint: "some.endpoint.aws"
      awsRegion: "us-east-1"
      awsKinesisStreamName: "my-stream"
      awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"
      messageFormat: "ONLY_RAW_PAYLOAD"
      retainOrdering: "true"