HDFS2 sink connector
You can download all the Pulsar connectors on download page.
The HDFS2 sink connector pulls the messages from Pulsar topics and persists the messages to HDFS files.
Configuration​
The configuration of the HDFS2 sink connector has the following properties.
Property​
Name | Type | Required | Default | Description |
---|---|---|---|---|
hdfsConfigResources | String | true | None | A file or a comma-separated list containing the Hadoop file system configuration. Example 'core-site.xml' 'hdfs-site.xml' |
directory | String | true | None | The HDFS directory where files read from or written to. |
encoding | String | false | None | The character encoding for the files. Example UTF-8 ASCII |
compression | Compression | false | None | The compression code used to compress or de-compress the files on HDFS. Below are the available options: |
kerberosUserPrincipal | String | false | None | The principal account of Kerberos user used for authentication. |
keytab | String | false | None | The full pathname of the Kerberos keytab file used for authentication. |
filenamePrefix | String | true | None | The prefix of the files created inside the HDFS directory. Example The value of topicA result in files named topicA-. |
fileExtension | String | true, if compression is set to None . | None | The extension added to the files written to HDFS. Example '.txt' '.seq' |
separator | char | false | None | The character used to separate records in a text file. If no value is provided, the contents from all records are concatenated together in one continuous byte array. |
syncInterval | long | false | 0 | The interval between calls to flush data to HDFS disk in milliseconds. |
maxPendingRecords | int | false | Integer.MAX_VALUE | The maximum number of records that hold in memory before acking. Setting this property to 1 makes every record send to disk before the record is acked. Setting this property to a higher value allows buffering records before flushing them to disk. |
subdirectoryPattern | String | false | None | A subdirectory associated with the created time of the sink. The pattern is the formatted pattern of directory 's subdirectory.See DateTimeFormatter for pattern's syntax. |
Example​
Before using the HDFS2 sink connector, you need to create a configuration file through one of the following methods.
JSON
{
"configs": {
"hdfsConfigResources": "core-site.xml",
"directory": "/foo/bar",
"filenamePrefix": "prefix",
"fileExtension": ".log",
"compression": "SNAPPY",
"subdirectoryPattern": "yyyy-MM-dd"
}
}YAML
configs:
hdfsConfigResources: "core-site.xml"
directory: "/foo/bar"
filenamePrefix: "prefix"
fileExtension: ".log"
compression: "SNAPPY"
subdirectoryPattern: "yyyy-MM-dd"