Skip to main content

Pulsar's WebSocket API

Pulsar's WebSocket API is meant to provide a simple way to interact with Pulsar using languages that do not have an official client library. Through WebSockets you can publish and consume messages and use all the features available in the Java, Python, and C++ client libraries.

You can use Pulsar's WebSocket API with any WebSocket client library. See examples for Python and Node.js below.

Running the WebSocket service

The standalone variant of Pulsar that we recommend using for local development already has the WebSocket service enabled.

In non-standalone mode, there are two ways to deploy the WebSocket service:

Embedded with a Pulsar broker

In this mode, the WebSocket service will run within the same HTTP service that's already running in the broker. To enable this mode, set the webSocketServiceEnabled parameter in the conf/broker.conf configuration file in your installation.


webSocketServiceEnabled=true

As a separate component

In this mode, the WebSocket service will be run from a Pulsar broker as a separate service. Configuration for this mode is handled in the conf/websocket.conf configuration file. You'll need to set at least the following parameters:

Here's an example:


configurationStoreServers=zk1:2181,zk2:2181,zk3:2181
webServicePort=8080
clusterName=my-cluster

Starting the broker

When the configuration is set, you can start the service using the pulsar-daemon tool:


$ bin/pulsar-daemon start websocket

API Reference

Pulsar's WebSocket API offers three endpoints for producing messages, consuming messages and reading messages.

All exchanges via the WebSocket API use JSON.

Producer endpoint

The producer endpoint requires you to specify a tenant, namespace, and topic in the URL:


ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic

Query param
KeyTypeRequired?Explanation
sendTimeoutMillislongnoSend timeout (default: 30 secs)
batchingEnabledbooleannoEnable batching of messages (default: false)
batchingMaxMessagesintnoMaximum number of messages permitted in a batch (default: 1000)
maxPendingMessagesintnoSet the max size of the internal-queue holding the messages (default: 1000)
batchingMaxPublishDelaylongnoTime period within which the messages will be batched (default: 10ms)
messageRoutingModestringnoMessage routing mode for the partitioned producer: SinglePartition, RoundRobinPartition
compressionTypestringnoCompression type: LZ4, ZLIB
producerNamestringnoSpecify the name for the producer. Pulsar will enforce only one producer with same name can be publishing on a topic
initialSequenceIdlongnoSet the baseline for the sequence ids for messages published by the producer.
hashingSchemestringnoHashing function to use when publishing on a partitioned topic: JavaStringHash, Murmur3_32Hash

Publishing a message


{
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"context": "1"
}

KeyTypeRequired?Explanation
payloadstringyesBase-64 encoded payload
propertieskey-value pairsnoApplication-defined properties
contextstringnoApplication-defined request identifier
keystringnoFor partitioned topics, decides which partition to use
replicationClustersarraynoRestrict replication to this list of clusters, specified by name
Example success response

{
"result": "ok",
"messageId": "CAAQAw==",
"context": "1"
}

Example failure response

{
"result": "send-error:3",
"errorMsg": "Failed to de-serialize from JSON",
"context": "1"
}

KeyTypeRequired?Explanation
resultstringyesok if successful or an error message if unsuccessful
messageIdstringyesMessage ID assigned to the published message
contextstringnoApplication-defined request identifier

Consumer endpoint

The consumer endpoint requires you to specify a tenant, namespace, and topic, as well as a subscription, in the URL:


ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription

Query param
KeyTypeRequired?Explanation
ackTimeoutMillislongnoSet the timeout for unacked messages (default: 0)
subscriptionTypestringnoSubscription type: Exclusive, Failover, Shared
receiverQueueSizeintnoSize of the consumer receive queue (default: 1000)
consumerNamestringnoConsumer name
priorityLevelintnoDefine a priority for the consumer
Receiving messages

Server will push messages on the WebSocket session:


{
"messageId": "CAAQAw==",
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"publishTime": "2016-08-30 16:45:57.785"
}

KeyTypeRequired?Explanation
messageIdstringyesMessage ID
payloadstringyesBase-64 encoded payload
publishTimestringyesPublish timestamp
propertieskey-value pairsnoApplication-defined properties
keystringnoOriginal routing key set by producer

Acknowledging the message

Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.


{
"messageId": "CAAQAw=="
}

KeyTypeRequired?Explanation
messageIdstringyesMessage ID of the processed message

Reader endpoint

The reader endpoint requires you to specify a tenant, namespace, and topic in the URL:


ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic

Query param
KeyTypeRequired?Explanation
readerNamestringnoReader name
receiverQueueSizeintnoSize of the consumer receive queue (default: 1000)
messageIdint or enumnoMessage ID to start from, earliest or latest (default: latest)
Receiving messages

Server will push messages on the WebSocket session:


{
"messageId": "CAAQAw==",
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"publishTime": "2016-08-30 16:45:57.785"
}

KeyTypeRequired?Explanation
messageIdstringyesMessage ID
payloadstringyesBase-64 encoded payload
publishTimestringyesPublish timestamp
propertieskey-value pairsnoApplication-defined properties
keystringnoOriginal routing key set by producer

Acknowledging the message

In WebSocket, Reader needs to acknowledge the successful processing of the message to have the Pulsar WebSocket service update the number of pending messages. If you don't send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit.


{
"messageId": "CAAQAw=="
}

KeyTypeRequired?Explanation
messageIdstringyesMessage ID of the processed message

Error codes

In case of error the server will close the WebSocket session using the following error codes:

Error CodeError Message
1Failed to create producer
2Failed to subscribe
3Failed to deserialize from JSON
4Failed to serialize to JSON
5Failed to authenticate client
6Client is not authorized
7Invalid payload encoding
8Unknown error

The application is responsible for re-establishing a new WebSocket session after a backoff period.

Client examples

Below you'll find code examples for the Pulsar WebSocket API in Python and Node.js.

Python

This example uses the websocket-client package. You can install it using pip:


$ pip install websocket-client

You can also download it from PyPI.

Python producer

Here's an example Python producer that sends a simple message to a Pulsar topic:


import websocket, base64, json

TOPIC = 'ws://localhost:8080/ws/producer/persistent/public/default/my-topic'

ws = websocket.create_connection(TOPIC)

# Send one message as JSON
ws.send(json.dumps({
'payload' : base64.b64encode('Hello World'),
'properties': {
'key1' : 'value1',
'key2' : 'value2'
},
'context' : 5
}))

response = json.loads(ws.recv())
if response['result'] == 'ok':
print 'Message published successfully'
else:
print 'Failed to publish message:', response
ws.close()

Python consumer

Here's an example Python consumer that listens on a Pulsar topic and prints the message ID whenever a message arrives:


import websocket, base64, json

TOPIC = 'ws://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'

ws = websocket.create_connection(TOPIC)

while True:
msg = json.loads(ws.recv())
if not msg: break

print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))

# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()

Python reader

Here's an example Python reader that listens on a Pulsar topic and prints the message ID whenever a message arrives:


import websocket, base64, json

TOPIC = 'ws://localhost:8080/ws/v2/reader/persistent/public/default/my-topic'

ws = websocket.create_connection(TOPIC)

while True:
msg = json.loads(ws.recv())
if not msg: break

print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))

# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()

Node.js

This example uses the ws package. You can install it using npm:


$ npm install ws

Node.js producer

Here's an example Node.js producer that sends a simple message to a Pulsar topic:


var WebSocket = require('ws'),
topic = "ws://localhost:8080/ws/v2/producer/persistent/public/default/my-topic",
ws = new WebSocket(topic);

var message = {
"payload" : new Buffer("Hello World").toString('base64'),
"properties": {
"key1" : "value1",
"key2" : "value2"
},
"context" : "1"
};

ws.on('open', function() {
// Send one message
ws.send(JSON.stringify(message));
});

ws.on('message', function(message) {
console.log('received ack: %s', message);
});

Node.js consumer

Here's an example Node.js consumer that listens on the same topic used by the producer above:


var WebSocket = require('ws'),
topic = "ws://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub",
ws = new WebSocket(topic);

ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
});

NodeJS reader


var WebSocket = require('ws'),
topic = "ws://localhost:8080/ws/v2/reader/persistent/public/default/my-topic",
ws = new WebSocket(topic);

ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
});