This hands-on tutorial provides step-by-step instructions and examples on how to create and validate functions in a standalone Pulsar, including stateful functions and window functions.
Enable pulsar function in conf/standalone.conf (add this field if not exists):
functionsWorkerEnabled=true
Start Pulsar locally.
bin/pulsar standalone
All the components (including ZooKeeper, BookKeeper, broker, and so on) of a Pulsar service start in order. You can use the bin/pulsar-admin brokers healthcheck command to make sure the Pulsar service is up and running.
You can see both the example-function-config.yaml and api-examples.jar files under the examples folder of the Pulsar's directory on your local machine.
This example function will add a ! at the end of every message.
In a new terminal window, produce messages to the input topictest_src.
bin/pulsar-client produce -m"test-messages-`date`"-n10 test_src
In the same terminal window as step 1, the messages produced by the example function are returned. You can see there is a ! added at the end of every message.
inputs:["test_wordcount_src"]# this function will read messages from these topics
autoAck:true
parallelism:1
You can see the source code of WordCountFunction. This function won't return any value but store the occurrence of words in function context. So you don't need to specify an output topic.
For more information about the yaml config, see the reference.
In the same terminal window as step 1, get the information of the word_count function.
In the same terminal window as step 1, get the status of the word_count function.
bin/pulsar-admin functions status \
--tenanttest\
--namespace test-namespace \
--name word_count
Output
{
"numInstances":1,
"numRunning":1,
"instances":[{
"instanceId":0,
"status":{
"running":true,
"error":"",
"numRestarts":0,
"numReceived":0,
"numSuccessfullyProcessed":0,
"numUserExceptions":0,
"latestUserExceptions":[],
"numSystemExceptions":0,
"latestSystemExceptions":[],
"averageLatency":0.0,
"lastInvocationTime":0,
"workerId":"c-standalone-fw-localhost-8080"
}
}]
}
In the same terminal window as step 1, query the state table for the function with the key hello. This operation watches the changes associated with hello.
bin/pulsar-admin functions querystate \
--tenanttest\
--namespace test-namespace \
--name word_count -k hello -w
tip
For more information about the pulsar-admin functions querystate options command, including flags, descriptions, default values, and shorthands, see Pulsar admin API.
Output
key 'hello' doesn't exist.
key 'hello' doesn't exist.
key 'hello' doesn't exist.
...
In a new terminal window, produce 10 messages with hello to the input topictest_wordcount_src using one of the following methods. The value of hello is updated to 10.
bin/pulsar-client produce -m"hello"-n10 test_wordcount_src
In the same terminal window as step 1, check the result.
The result shows that the output topictest_wordcount_dest receives the messages.
Output
{
"key":"hello",
"numberValue":10,
"version":9
}
In the terminal window as step 5, produce another 10 messages with hello. The value of hello is updated to 20.
bin/pulsar-client produce -m"hello"-n10 test_wordcount_src
In the same terminal window as step 1, check the result.
The result shows that the output topictest_wordcount_dest receives the value of 20.