Skip to main content

Kinesis sink connector

The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.

Configuration​

The configuration of the Kinesis sink connector has the following property.

Property​

NameTypeRequiredDefaultDescription
messageFormatMessageFormattrueONLY_RAW_PAYLOADMessage format in which Kinesis sink converts Pulsar messages and publishes to Kinesis streams.

Below are the available options:

  • ONLY_RAW_PAYLOAD: Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream.

  • FULL_MESSAGE_IN_JSON: Kinesis sink creates a JSON payload with Pulsar message payload, properties and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.

  • FULL_MESSAGE_IN_FB: Kinesis sink creates a flatbuffer serialized payload with Pulsar message payload, properties and encryptionCtx, and publishes flatbuffer payload into the configured Kinesis stream.
  • retainOrderingbooleanfalsefalseWhether Pulsar connectors to retain ordering when moving messages from Pulsar to Kinesis or not.
    awsEndpointStringfalse" " (empty string)The Kinesis end-point URL, which can be found at here.
    awsRegionStringfalse" " (empty string)The AWS region.

    Example
    us-west-1, us-west-2
    awsKinesisStreamNameStringtrue" " (empty string)The Kinesis stream name.
    awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of AwsCredentialProviderPlugin.

    It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink.

    If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam.
    awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin.

    Built-in plugins​

    The following are built-in AwsCredentialProviderPlugin plugins:

    • org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin

      This plugin takes no configuration, it uses the default AWS provider chain.

      For more information, see AWS documentation.

    • org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin

      This plugin takes a configuration (via the awsCredentialPluginParam) that describes a role to assume when running the KCL.

      This configuration takes the form of a small json document like:


      {"roleArn": "arn...", "roleSessionName": "name"}

    Example​

    Before using the Kinesis sink connector, you need to create a configuration file through one of the following methods.

    • JSON


      {
      "awsEndpoint": "https://some.endpoint.aws",
      "awsRegion": "us-east-1",
      "awsKinesisStreamName": "my-stream",
      "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
      "messageFormat": "ONLY_RAW_PAYLOAD",
      "retainOrdering": "true"
      }

    • YAML


      configs:
      awsEndpoint: "https://some.endpoint.aws"
      awsRegion: "us-east-1"
      awsKinesisStreamName: "my-stream"
      awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"
      messageFormat: "ONLY_RAW_PAYLOAD"
      retainOrdering: "true"