Kinesis sink connector
The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.
Configuration​
The configuration of the Kinesis sink connector has the following property.
Property​
Name | Type | Required | Default | Description |
---|---|---|---|---|
messageFormat | MessageFormat | true | ONLY_RAW_PAYLOAD | Message format in which Kinesis sink converts Pulsar messages and publishes to Kinesis streams. Below are the available options: ONLY_RAW_PAYLOAD : Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream. FULL_MESSAGE_IN_JSON : Kinesis sink creates a JSON payload with Pulsar message payload, properties and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.FULL_MESSAGE_IN_FB : Kinesis sink creates a flatbuffer serialized payload with Pulsar message payload, properties and encryptionCtx, and publishes flatbuffer payload into the configured Kinesis stream.FULL_MESSAGE_IN_JSON_EXPAND_VALUE : Kinesis sink sends a JSON structure containing the record topic name, key, payload, properties and event time. The record schema is used to convert the value to JSON. |
retainOrdering | boolean | false | false | Whether Pulsar connectors to retain ordering when moving messages from Pulsar to Kinesis or not. |
awsEndpoint | String | false | " " (empty string) | The Kinesis end-point URL, which can be found at here. |
awsRegion | String | false | " " (empty string) | The AWS region. Example us-west-1, us-west-2 |
awsKinesisStreamName | String | true | " " (empty string) | The Kinesis stream name. |
awsCredentialPluginName | String | false | " " (empty string) | The fully-qualified class name of implementation of AwsCredentialProviderPlugin. It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink. If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam . |
awsCredentialPluginParam | String | false | " " (empty string) | The JSON parameter to initialize awsCredentialsProviderPlugin . |
Built-in plugins​
The following are built-in AwsCredentialProviderPlugin
plugins:
-
org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin
This plugin takes no configuration, it uses the default AWS provider chain.
For more information, see AWS documentation.
-
org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin
This plugin takes a configuration (via the
awsCredentialPluginParam
) that describes a role to assume when running the KCL.This configuration takes the form of a small json document like:
{"roleArn": "arn...", "roleSessionName": "name"}
Example​
Before using the Kinesis sink connector, you need to create a configuration file through one of the following methods.
-
JSON
{
"configs": {
"awsEndpoint": "some.endpoint.aws",
"awsRegion": "us-east-1",
"awsKinesisStreamName": "my-stream",
"awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
"messageFormat": "ONLY_RAW_PAYLOAD",
"retainOrdering": "true"
}
} -
YAML
configs:
awsEndpoint: "some.endpoint.aws"
awsRegion: "us-east-1"
awsKinesisStreamName: "my-stream"
awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"
messageFormat: "ONLY_RAW_PAYLOAD"
retainOrdering: "true"