RabbitMQ source connector
The RabbitMQ source connector receives messages from RabbitMQ clusters and writes messages to Pulsar topics.
Configurationβ
The configuration of the RabbitMQ source connector has the following properties.
Propertyβ
Name | Type | Required | Default | Description |
---|---|---|---|---|
connectionName | String | true | " " (empty string) | The connection name. |
host | String | true | " " (empty string) | The RabbitMQ host. |
port | int | true | 5672 | The RabbitMQ port. |
virtualHost | String | true | / | The virtual host used to connect to RabbitMQ. |
username | String | false | guest | The username used to authenticate to RabbitMQ. |
password | String | false | guest | The password used to authenticate to RabbitMQ. |
queueName | String | true | " " (empty string) | The RabbitMQ queue name that messages should be read from or written to. |
requestedChannelMax | int | false | 0 | The initially requested maximum channel number. 0 means unlimited. |
requestedFrameMax | int | false | 0 | The initially requested maximum frame size in octets. 0 means unlimited. |
connectionTimeout | int | false | 60000 | The timeout of TCP connection establishment in milliseconds. 0 means infinite. |
handshakeTimeout | int | false | 10000 | The timeout of AMQP0-9-1 protocol handshake in milliseconds. |
requestedHeartbeat | int | false | 60 | The requested heartbeat timeout in seconds. |
prefetchCount | int | false | 0 | The maximum number of messages that the server delivers. 0 means unlimited. |
prefetchGlobal | boolean | false | false | Whether the setting should be applied to the entire channel rather than each consumer. |
passive | boolean | false | false | Whether the rabbitmq consumer should create its own queue or bind to an existing one. |
Exampleβ
Before using the RabbitMQ source connector, you need to create a configuration file through one of the following methods.
-
JSON
{
"host": "localhost",
"port": "5672",
"virtualHost": "/",
"username": "guest",
"password": "guest",
"queueName": "test-queue",
"connectionName": "test-connection",
"requestedChannelMax": "0",
"requestedFrameMax": "0",
"connectionTimeout": "60000",
"handshakeTimeout": "10000",
"requestedHeartbeat": "60",
"prefetchCount": "0",
"prefetchGlobal": "false",
"passive": "false"
} -
YAML
configs:
host: "localhost"
port: 5672
virtualHost: "/"
username: "guest"
password: "guest"
queueName: "test-queue"
connectionName: "test-connection"
requestedChannelMax: 0
requestedFrameMax: 0
connectionTimeout: 60000
handshakeTimeout: 10000
requestedHeartbeat: 60
prefetchCount: 0
prefetchGlobal: "false"
passive: "false"
Usageβ
Standalone modeβ
This example describes how to use the RabbitMQ source connector to feed data from RabbitMQ and write data to Pulsar topics in the standalone mode.
Prerequisitesβ
- There is a RabbitMQ server with some history messages in the queue.
Stepsβ
-
Get a Pulsar package and start Pulsar in standalone mode.
wget https://archive.apache.org/dist/pulsar/pulsar-2.8.4/apache-pulsar-2.8.4-bin.tar.gz
tar xvfz apache-pulsar-2.8.4-bin.tar.gz
cd apache-pulsar-2.8.4
bin/pulsar standalone -
Download the nar package corresponding to Pulsar's version and copy the following file to Pulsar's directory.
wget https://archive.apache.org/dist/pulsar/pulsar-2.8.4/connectors/pulsar-io-rabbitmq-2.8.4.nar
cp pulsar-io-rabbitmq-2.8.4.nar ./connectors -
Messages published to a topic lacking at least one durable subscription are automatically marked as ready for deletion by default. We can set a retention policy at the namespace level to prevent this.
./bin/pulsar-admin namespaces set-retention -s 100M -t 3d public/default
-
Prepare a configuration file with name is rabbitmq-source-queue-name.yaml.
configs:
host: "localhost"
port: 5672
virtualHost: "/"
username: "guest"
password: "guest"
queueName: "test-queue"
connectionName: "test-connection"
requestedChannelMax: 0
requestedFrameMax: 0
connectionTimeout: 60000
handshakeTimeout: 10000
requestedHeartbeat: 60
prefetchCount: 0
prefetchGlobal: "false"
passive: "false"Copy the configuration file to Pulsarβs conf directory.
cp rabbitmq-source-queue-name.yaml ./conf
-
Open a new terminal window and start the connector in local run mode.
./bin/pulsar-admin source localrun \
--source-config-file conf/rabbitmq-source-queue-name.yaml \
--archive connectors/pulsar-io-rabbitmq-2.8.4.nar \
--name rabbitmq-source \
--destination-topic-name pulsar-rabbitmq-test-topic \
--broker-service-url pulsar://{ip}:{port} -
Open a new terminal window and check the topic is created automatically.
./bin/pulsar-admin topics list public/default \
This topic is created automatically as follows:
persistent://public/default/pulsar-rabbitmq-test-topic-partition-0
-
Consume this topic.
./bin/pulsar-client consume persistent://public/default/pulsar-rabbitmq-test-topic-partition-0 -s s1 -n 0 -p Earliest
The following information appears on the consumer terminal window.
----- got message -----
key:[quick.orange.pulsar], properties:[], content:message-topic-O(range) 0
----- got message -----
key:[quick.orange.pulsar], properties:[], content:message-topic-O(range) 1
... ...