Getting started with Pulsar Functions
This hands-on tutorial provides step-by-step instructions and examples on how to create and validate functions in a standalone Pulsar, including stateful functions and window functions.
Prerequisites
- JDK 8+. For more details, refer to Pulsar runtime Java version recommendation.
- Windows OS is not supported.
Start standalone Pulsar
Start Pulsar locally.
bin/pulsar standaloneAll the components (including ZooKeeper, BookKeeper, broker, and so on) of a Pulsar service start in order. You can use the
bin/pulsar-admin brokers healthcheck
command to make sure the Pulsar service is up and running.Check the Pulsar binary protocol port.
telnet localhost 6650Check the Pulsar Function cluster.
bin/pulsar-admin functions-worker get-clusterOutput
[{"workerId":"c-standalone-fw-localhost-6750","workerHostname":"localhost","port":6750}]Make sure a public tenant exists.
bin/pulsar-admin tenants listOutput
"public"Make sure a default namespace exists.
bin/pulsar-admin namespaces list publicOutput
"public/default"Make sure the table service is enabled successfully.
telnet localhost 4181Output
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Start functions
note
Before starting functions, you need to start Pulsar.
Create a tenant and a namespace.
bin/pulsar-admin tenants create test
bin/pulsar-admin namespaces create test/test-namespaceIn the same terminal window as step 1, verify the tenant and the namespace.
bin/pulsar-admin namespaces list testOutput
This output shows that both tenant and namespace are created successfully.
"test/test-namespace"In the same terminal window as step 1, create a function named
examples
.tip
You can see both the
example-function-config.yaml
andapi-examples.jar
files under theexamples
folder of the Pulsar’s directory on your local machine.
bin/pulsar-admin functions create \
--function-config-file examples/example-function-config.yaml \
--jar examples/api-examples.jarOutput
Created SuccessfullyIn the same terminal window as step 1, verify the function's configurations.
bin/pulsar-admin functions get \
--tenant test \
--namespace test-namespace \
--name exampleOutput
{
"tenant": "test",
"namespace": "test-namespace",
"name": "example",
"className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"userConfig": "{\"PublishTopic\":\"test_result\"}",
"autoAck": true,
"parallelism": 1,
"source": {
"topicsToSerDeClassName": {
"test_src": ""
},
"typeClassName": "java.lang.String"
},
"sink": {
"topic": "test_result",
"typeClassName": "java.lang.String"
},
"resources": {}
}In the same terminal window as step 1, verify the function's status.
bin/pulsar-admin functions status \
--tenant test \
--namespace test-namespace \
--name exampleOutput
"running": true
shows that the function is running.
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
}In the same terminal window as step 1, subscribe to the output topic
test_result
.
bin/pulsar-client consume -s test-sub -n 0 test_resultIn a new terminal window, produce messages to the input topic
test_src
.
bin/pulsar-client produce -m "test-messages-`date`" -n 10 test_srcIn the same terminal window as step 1, the messages produced by the
example
function are returned.Output
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
Start stateful functions
The standalone mode of Pulsar enables BookKeeper table service for stateful functions. For more information, see Configure state storage.
The following example provides instructions to validate counter functions.
note
Before starting stateful functions, you need to start Pulsar.
Create a function named
word_count
.
bin/pulsar-admin functions create \
--function-config-file examples/example-function-config.yaml \
--jar examples/api-examples.jar \
--name word_count \
--className org.apache.pulsar.functions.api.examples.WordCountFunction \
--inputs test_wordcount_src \
--output test_wordcount_destOutput
Created SuccessfullyIn the same terminal window as step 1, get the information of the
word_count
function.
bin/pulsar-admin functions get \
--tenant test \
--namespace test-namespace \
--name word_countOutput
{
"tenant": "test",
"namespace": "test-namespace",
"name": "word_count",
"className": "org.apache.pulsar.functions.api.examples.WordCountFunction",
"inputSpecs": {
"test_wordcount_src": {
"isRegexPattern": false
}
},
"output": "test_wordcount_dest",
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"userConfig": {
"PublishTopic": "test_result"
},
"runtime": "JAVA",
"autoAck": true,
"parallelism": 1,
"resources": {
"cpu": 1.0,
"ram": 1073741824,
"disk": 10737418240
},
"cleanupSubscription": true
}In the same terminal window as step 1, get the status of the
word_count
function.
bin/pulsar-admin functions status \
--tenant test \
--namespace test-namespace\
--name word_countOutput
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
}In the same terminal window as step 1, query the state table for the function with the key
hello
. This operation watches the changes associated withhello
.
bin/pulsar-admin functions querystate \
--tenant test \
--namespace test-namespace \
--name word_count -k hello -wtip
For more information about the
pulsar-admin functions querystate options
command, including flags, descriptions, default values, and shorthands, see Admin API.Output
key 'hello' doesn't exist.
key 'hello' doesn't exist.
key 'hello' doesn't exist.
...In a new terminal window, produce 10 messages with
hello
to the input topictest_wordcount_src
using one of the following methods. The value ofhello
is updated to 10.Method 1
bin/pulsar-client produce -m "hello" -n 10 test_wordcount_srcMethod 2
bin/pulsar-admin functions putstate \
--tenant test \
--namespace test-namespace \
--name word_count hello-word \tip
For more information about the
pulsar-admin functions putstate options
command, including flags, descriptions, default values, and shorthands, see Admin API.
In the same terminal window as step 1, check the result.
The result shows that the output topic
test_wordcount_dest
receives the messages.Output
{
"key": "hello",
"numberValue": 10,
"version": 9
}In the terminal window as step 5, produce another 10 messages with
hello
. The value ofhello
is updated to 20.
bin/pulsar-client produce -m "hello" -n 10 test_wordcount_srcIn the same terminal window as step 1, check the result.
The result shows that the output topic
test_wordcount_dest
receives the value of 20.
value = 10
value = 20
Start window functions
Window functions are a special form of Pulsar Functions. For more information, see concepts.
note
Before starting window functions, you need to start Pulsar.
Create a tenant and a namespace.
bin/pulsar-admin tenants create test
bin/pulsar-admin namespaces create test/test-namespaceIn the same terminal window as step 1, verify the tenant and the namespace.
bin/pulsar-admin namespaces list testOutput
This output shows that both tenant and namespace are created successfully.
"test/test-namespace"In the same terminal window as step 1, create a function named
example
.tip
You can see both
example-window-function-config.yaml
andapi-examples.jar
files under theexamples
folder of the Pulsar’s directory on your local machine.
bin/pulsar-admin functions create --function-config-file \
examples/example-window-function-config.yaml \
--jar examples/api-examples.jarOutput
Created SuccessfullyIn the same terminal window as step 1, verify the function's configurations.
bin/pulsar-admin functions get \
--tenant test \
--namespace test-namespace \
--name exampleOutput
{
"tenant": "test",
"namespace": "test-namespace",
"name": "example",
"className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"userConfig": "{\"PublishTopic\":\"test_result\"}",
"autoAck": true,
"parallelism": 1,
"source": {
"topicsToSerDeClassName": {
"test_src": ""
},
"typeClassName": "java.lang.String"
},
"sink": {
"topic": "test_result",
"typeClassName": "java.lang.String"
},
"resources": {}
}In the same terminal window as step 1, verify the function’s status.
bin/pulsar-admin functions status \
--tenant test \
--namespace test-namespace \
--name exampleOutput
"running": true
shows that the function is running.
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
}In the same terminal window as step 1, subscribe to the output topic
test_result
.
bin/pulsar-client consume -s test-sub -n 0 test_resultIn a new terminal window, produce messages to the input topic
test_src
.
bin/pulsar-client produce -m "test-messages-`date`" -n 10 test_srcIn the same terminal window as step 1, the messages produced by the window function
example
are returned.Output
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!