Skip to main content
Version: Next

AWS DynamoDB source connector

note

You can download all the Pulsar connectors on download page.

The DynamoDB source connector pulls data from DynamoDB table streams and persists data into Pulsar.

This connector uses the DynamoDB Streams Kinesis Adapter, which uses the Kinesis Consumer Library (KCL) to do the actual consuming of messages. The KCL uses DynamoDB to track the state of consumers and requires cloudwatch access to log metrics.

Configuration​

The configuration of the DynamoDB source connector has the following properties.

Property​

NameTypeRequiredDefaultDescription
initialPositionInStreamInitialPositionInStreamfalseLATESTThe position where the connector starts from.

Below are the available options:

  • AT_TIMESTAMP: start from the record at or after the specified timestamp.

  • LATEST: start after the most recent data record.

  • TRIM_HORIZON: start from the oldest available data record.
  • startAtTimeDatefalse" " (empty string)If set to AT_TIMESTAMP, it specifies the point in time to start consumption.
    applicationNameStringfalsePulsar IO connectorThe name of the KCL application. Must be unique, as it is used to define the table name for the dynamo table used for state tracking.

    By default, the application name is included in the user agent string used to make AWS requests. This can assist with troubleshooting, for example, distinguish requests made by separate connector instances.
    checkpointIntervallongfalse60000The frequency of the KCL checkpoint in milliseconds.
    backoffTimelongfalse3000The amount of time to delay between requests when the connector encounters a throttling exception from AWS Kinesis in milliseconds.
    numRetriesintfalse3The number of re-attempts when the connector encounters an exception while trying to set a checkpoint.
    receiveQueueSizeintfalse1000The maximum number of AWS records that can be buffered inside the connector.

    Once the receiveQueueSize is reached, the connector does not consume any messages from Kinesis until some messages in the queue are successfully consumed.
    dynamoEndpointStringfalse" " (empty string)The Dynamo end-point URL, which can be found at here.
    cloudwatchEndpointStringfalse" " (empty string)The Cloudwatch end-point URL, which can be found at here.
    awsEndpointStringfalse" " (empty string)The DynamoDB Streams end-point URL, which can be found at here.
    awsRegionStringfalse" " (empty string)The AWS region.

    Example
    us-west-1, us-west-2
    awsDynamodbStreamArnStringtrue" " (empty string)The DynamoDB stream arn.
    awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of AwsCredentialProviderPlugin.

    awsCredentialProviderPlugin has the following built-in plugs:

  • org.apache.pulsar.io.kinesis.AwsDefaultProviderChainPlugin:
    this plugin uses the default AWS provider chain.
    For more information, see using the default credential provider chain.

  • org.apache.pulsar.io.kinesis.STSAssumeRoleProviderPlugin:
    this plugin takes a configuration via the awsCredentialPluginParam that describes a role to assume when running the KCL.
    JSON configuration example
    {"roleArn": "arn...", "roleSessionName": "name"}

    awsCredentialPluginName is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink.

    If awsCredentialPluginName set to empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam.
  • awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin.

    Example​

    Before using the DynamoDB source connector, you need to create a configuration file through one of the following methods.

    • JSON

      {
      "configs": {
      "awsEndpoint": "https://some.endpoint.aws",
      "awsRegion": "us-east-1",
      "awsDynamodbStreamArn": "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291",
      "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
      "applicationName": "My test application",
      "checkpointInterval": "30000",
      "backoffTime": "4000",
      "numRetries": "3",
      "receiveQueueSize": 2000,
      "initialPositionInStream": "TRIM_HORIZON",
      "startAtTime": "2019-03-05T19:28:58.000Z"
      }
      }
    • YAML

      configs:
      awsEndpoint: "https://some.endpoint.aws"
      awsRegion: "us-east-1"
      awsDynamodbStreamArn: "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291"
      awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"
      applicationName: "My test application"
      checkpointInterval: 30000
      backoffTime: 4000
      numRetries: 3
      receiveQueueSize: 2000
      initialPositionInStream: "TRIM_HORIZON"
      startAtTime: "2019-03-05T19:28:58.000Z"