File source connector
The File source connector pulls messages from files in directories and persists the messages to Pulsar topics.
Configuration​
The configuration of the File source connector has the following properties.
Property​
Name | Type | Required | Default | Description |
---|---|---|---|---|
inputDirectory | String | true | No default value | The input directory to pull files. |
recurse | Boolean | false | true | Whether to pull files from subdirectory or not. |
keepFile | Boolean | false | false | If set to true, the file is not deleted after it is processed, which means the file can be picked up continually. |
fileFilter | String | false | [^\.].* | The file whose name matches the given regular expression is picked up. |
pathFilter | String | false | NULL | If recurse is set to true, the subdirectory whose path matches the given regular expression is scanned. |
minimumFileAge | Integer | false | 0 | The minimum age that a file can be processed. Any file younger than minimumFileAge (according to the last modification date) is ignored. |
maximumFileAge | Long | false | Long.MAX_VALUE | The maximum age that a file can be processed. Any file older than maximumFileAge (according to last modification date) is ignored. |
minimumSize | Integer | false | 1 | The minimum size (in bytes) that a file can be processed. |
maximumSize | Double | false | Double.MAX_VALUE | The maximum size (in bytes) that a file can be processed. |
ignoreHiddenFiles | Boolean | false | true | Whether the hidden files should be ignored or not. |
pollingInterval | Long | false | 10000L | Indicates how long to wait before performing a directory listing. |
numWorkers | Integer | false | 1 | The number of worker threads that process files. This allows you to process a larger number of files concurrently. However, setting this to a value greater than 1 makes the data from multiple files mixed in the target topic. |
processedFileSuffix | String | false | NULL | If set, do not delete but only rename file that has been processed. This config only work when 'keepFile' property is false. |
Example​
Before using the File source connector, you need to create a configuration file through one of the following methods.
-
JSON
{
"configs": {
"inputDirectory": "/Users/david",
"recurse": true,
"keepFile": true,
"fileFilter": "[^\\.].*",
"pathFilter": ".*",
"minimumFileAge": 0,
"maximumFileAge": 9999999999,
"minimumSize": 1,
"maximumSize": 5000000,
"ignoreHiddenFiles": true,
"pollingInterval": 5000,
"numWorkers": 1,
"processedFileSuffix": ".processed_done"
}
} -
YAML
configs:
inputDirectory: "/Users/david"
recurse: true
keepFile: true
fileFilter: "[^\\.].*"
pathFilter: ".*"
minimumFileAge: 0
maximumFileAge: 9999999999
minimumSize: 1
maximumSize: 5000000
ignoreHiddenFiles: true
pollingInterval: 5000
numWorkers: 1
processedFileSuffix: ".processed_done"
Usage​
Here is an example of using the File source connecter.
-
Pull a Pulsar image.
$ docker pull apachepulsar/pulsar:{version} -
Start Pulsar standalone.
$ docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:{version} bin/pulsar standalone -
Create a configuration file file-connector.yaml.
configs:
inputDirectory: "/opt" -
Copy the configuration file file-connector.yaml to the container.
$ docker cp connectors/file-connector.yaml pulsar-standalone:/pulsar/ -
Download the File source connector.
$ curl -O https://mirrors.tuna.tsinghua.edu.cn/apache/pulsar/pulsar-{version}/connectors/pulsar-io-file-{version}.nar -
Copy it to the
connectors
folder, then restart the container.
$ docker cp pulsar-io-file-{version}.nar pulsar-standalone:/pulsar/connectors/
$ docker restart pulsar-standalone -
Start the File source connector.
$ docker exec -it pulsar-standalone /bin/bash
$ ./bin/pulsar-admin sources localrun \
--archive /pulsar/connectors/pulsar-io-file-{version}.nar \
--name file-test \
--destination-topic-name pulsar-file-test \
--source-config-file /pulsar/file-connector.yaml -
Start a consumer.
./bin/pulsar-client consume -s file-test -n 0 pulsar-file-test -
Write the message to the file test.txt.
echo "hello world!" > /opt/test.txtThe following information appears on the consumer terminal window.
----- got message -----
hello world!