Tutorials
Write a function for word count
The following example is a stateful function. By default, the state of a function is disabled. See Enable stateful functions for more instructions.
-
Write the function in Java using the SDK for Java.
package org.example.functions;import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import java.util.Arrays;public class WordCountFunction implements Function<String, Void> {// This function is invoked every time a message is published to the input topic@Overridepublic Void process(String input, Context context) throws Exception {Arrays.asList(input.split(" ")).forEach(word -> {String counterKey = word.toLowerCase();context.incrCounter(counterKey, 1);});return null;}} -
Bundle and build the JAR file, and then deploy it in your Pulsar cluster using the
pulsar-admincommand.bin/pulsar-admin functions create \--jar $PWD/target/my-jar-with-dependencies.jar \--classname org.example.functions.WordCountFunction \--tenant public \--namespace default \--name word-count \--inputs persistent://public/default/sentences \--output persistent://public/default/count
Write a function for content-based routing
-
Write the function in Python using the SDK for Python.
from pulsar import Functionclass RoutingFunction(Function):def __init__(self):self.fruits_topic = "persistent://public/default/fruits"self.vegetables_topic = "persistent://public/default/vegetables"def is_fruit(item):return item in [b"apple", b"orange", b"pear", b"other fruits..."]def is_vegetable(item):return item in [b"carrot", b"lettuce", b"radish", b"other vegetables..."]def process(self, item, context):if self.is_fruit(item):context.publish(self.fruits_topic, item)elif self.is_vegetable(item):context.publish(self.vegetables_topic, item)else:warning = "The item {0} is neither a fruit nor a vegetable".format(item)context.get_logger().warn(warning) -
Suppose this code is stored in
~/router.py, then you can deploy it in your Pulsar cluster using thepulsar-admincommand.bin/pulsar-admin functions create \--py ~/router.py \--classname router.RoutingFunction \--tenant public \--namespace default \--name route-fruit-veg \--inputs persistent://public/default/basket-items
Write a window function for word count
Currently, window functions are only available in Java.
This example demonstrates how to use the language-native interface to write a window function in Java.
Each input message is a sentence that is split into words and each word is counted. The built-in counter state is used to keep track of the word count in a persistent and consistent manner.
public class WordCountFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
Arrays.asList(input.split("\\s+")).forEach(word -> context.incrCounter(word, 1));
return null;
}
}