Redis sink connector
You can download all the Pulsar connectors on download page.
The Redis sink connector pulls messages from Pulsar topics and persists the messages to a Redis database.
Configuration​
The configuration of the Redis sink connector has the following properties.
Property​
Name | Type | Required | Default | Description |
---|---|---|---|---|
redisHosts | String | true | " " (empty string) | A comma-separated list of Redis hosts to connect to. |
redisPassword | String | false | " " (empty string) | The password used to connect to Redis. |
redisDatabase | int | true | 0 | The Redis database to connect to. |
clientMode | String | false | Standalone | The client mode when interacting with Redis cluster. Below are the available options: |
autoReconnect | boolean | false | true | Whether the Redis client automatically reconnect or not. |
requestQueue | int | false | 2147483647 | The maximum number of queued requests to Redis. |
tcpNoDelay | boolean | false | false | Whether to enable TCP with no delay or not. |
keepAlive | boolean | false | false | Whether to enable a keepalive to Redis or not. |
connectTimeout | long | false | 10000 | The time to wait before timing out when connecting in milliseconds. |
operationTimeout | long | false | 10000 | The time before an operation is marked as timed out in milliseconds . |
batchTimeMs | int | false | 1000 | The Redis operation time in milliseconds. |
batchSize | int | false | 200 | The batch size of writing to Redis database. |
Example​
Before using the Redis sink connector, you need to create a configuration file in the path you will start the Pulsar service (PULSAR_HOME
) through one of the following methods.
JSON
{
"configs": {
"redisHosts": "localhost:6379",
"redisPassword": "mypassword",
"redisDatabase": "0",
"clientMode": "Standalone",
"operationTimeout": "2000",
"batchSize": "1",
"batchTimeMs": "1000",
"connectTimeout": "3000"
}
}YAML
configs:
redisHosts: "localhost:6379"
redisPassword: "mypassword"
redisDatabase: 0
clientMode: "Standalone"
operationTimeout: 2000
batchSize: 1
batchTimeMs: 1000
connectTimeout: 3000
Usage​
This example shows how to write records to a Redis database using the Pulsar Redis connector.
Start a Redis server.
docker pull redis:5.0.5
docker run -d -p 6379:6379 --name my-redis redis:5.0.5 --requirepass "mypassword"Start a Pulsar service locally in standalone mode.
bin/pulsar standalone
Make sure the NAR file is available at
connectors/pulsar-io-redis-3.3.1.nar
.Start the Pulsar Redis connector in local run mode using one of the following methods.
- Use the JSON configuration file as shown previously.
bin/pulsar-admin sinks localrun \
--archive $PWD/connectors/pulsar-io-redis-3.3.1.nar \
--tenant public \
--namespace default \
--name my-redis-sink \
--sink-config '{"redisHosts": "localhost:6379","redisPassword": "mypassword","redisDatabase": "0","clientMode": "Standalone","operationTimeout": "3000","batchSize": "1"}' \
--inputs my-redis-topicUse the YAML configuration file as shown previously.
bin/pulsar-admin sinks localrun \
--archive $PWD/connectors/pulsar-io-redis-3.3.1.nar \
--tenant public \
--namespace default \
--name my-redis-sink \
--sink-config-file $PWD/redis-sink-config.yaml \
--inputs my-redis-topic
Publish records to the topic.
bin/pulsar-client produce \
persistent://public/default/my-redis-topic \
-k "streaming" \
-m "Pulsar"Start a Redis client in Docker.
docker exec -it my-redis redis-cli -a "mypassword"
Check the key/value in Redis.
127.0.0.1:6379> keys *
1) "streaming"
127.0.0.1:6379> get "streaming"
"Pulsar"