Getting started with Pulsar Functions

This tutorial will walk you through running a standalone Pulsar cluster on your machine and then running your first Pulsar Functions using that cluster. The first function will run in local run mode (outside your Pulsar cluster), while the second will run in cluster mode (inside your cluster).

In local run mode, your Pulsar Function will communicate with your Pulsar cluster but will run outside of the cluster.

Prerequisites​

In order to follow along with this tutorial, you'll need to have Maven installed on your machine.

Run a standalone Pulsar cluster​

In order to run our Pulsar Functions, we'll need to run a Pulsar cluster locally first. The easiest way to do that is to run Pulsar in standalone mode. Follow these steps to start up a standalone cluster:

$wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz$ tar xvfz apache-pulsar-2.3.0-bin.tar.gz
$cd apache-pulsar-2.3.0$ bin/pulsar standalone \

When running Pulsar in standalone mode, the public tenant and default namespace will be created automatically for you. That tenant and namespace will be used throughout this tutorial.

Run a Pulsar Function in local run mode​

Let's start with a simple function that takes a string as input from a Pulsar topic, adds an exclamation point to the end of the string, and then publishes that new string to another Pulsar topic. Here's the code for the function:

package org.apache.pulsar.functions.api.examples;

import java.util.function.Function;

public class ExclamationFunction implements Function<String, String> {
@Override
public String apply(String input) {
return String.format("%s!", input);
}
}

A JAR file containing this and several other functions (written in Java) is included with the binary distribution you downloaded above (in the examples folder). To run the function in local mode, i.e. on our laptop but outside our Pulsar cluster:

$bin/pulsar-admin functions localrun \ --jar examples/api-examples.jar \ --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \ --inputs persistent://public/default/exclamation-input \ --output persistent://public/default/exclamation-output \ --name exclamation Multiple input topics allowed​ In the example above, a single topic was specified using the --inputs flag. You can also specify multiple input topics as a comma-separated list using the same flag. Here's an example: --inputs topic1,topic2 We can open up another shell and use the pulsar-client tool to listen for messages on the output topic:$ bin/pulsar-client consume persistent://public/default/exclamation-output \
--subscription-name my-subscription \
--num-messages 0

Setting the --num-messages flag to 0 means that the consumer will listen on the topic indefinitely (rather than only accepting a certain number of messages).

With a listener up and running, we can open up another shell and produce a message on the input topic that we specified:

$bin/pulsar-client produce persistent://public/default/exclamation-input \ --num-produce 1 \ --messages "Hello world" In the output, you should see the following: ----- got message ----- Hello world! Success! As you can see, the message has been successfully processed by the exclamation function. To shut down the function, simply hit Ctrl+C. Here's what happened: • The Hello world message that we published to the input topic (persistent://public/default/exclamation-input) was passed to the exclamation function that we ran on our machine • The exclamation function processed the message (providing a result of Hello world!) and published the result to the output topic (persistent://public/default/exclamation-output). • If our exclamation function hadn't been running, Pulsar would have durably stored the message data published to the input topic in Apache BookKeeper until a consumer consumed and acknowledged the message Run a Pulsar Function in cluster mode​ Local run mode is useful for development and experimentation, but if you want to use Pulsar Functions in a real Pulsar deployment, you'll want to run them in cluster mode. In this mode, Pulsar Functions run inside your Pulsar cluster and are managed using the same pulsar-admin functions interface that we've been using thus far. This command, for example, would deploy the same exclamation function we ran locally above in our Pulsar cluster (rather than outside it):$ bin/pulsar-admin functions create \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--name exclamation

You should see Created successfully in the output. Now, let's see a list of functions running in our cluster:

$bin/pulsar-admin functions list \ --tenant public \ --namespace default We should see just the exclamation function listed there. We can also check the status of our deployed function using the getstatus command:$ bin/pulsar-admin functions getstatus \
--tenant public \
--namespace default \
--name exclamation

You should see this JSON output:

{
"functionStatusList": [
{
"running": true,
"instanceId": "0"
}
]
}

As we can see, (a) the instance is currently running and (b) there is one instance, with an ID of 0, running. We can get other information about the function (topics, tenant, namespace, etc.) using the get command instead of getstatus:

$bin/pulsar-admin functions get \ --tenant public \ --namespace default \ --name exclamation You should see this JSON output: { "tenant": "public", "namespace": "default", "name": "exclamation", "className": "org.apache.pulsar.functions.api.examples.ExclamationFunction", "output": "persistent://public/default/exclamation-output", "autoAck": true, "inputs": [ "persistent://public/default/exclamation-input" ], "parallelism": 1 } As we can see, the parallelism of the function is 1, meaning that only one instance of the function is running in our cluster. Let's update our function to a parallelism of 3 using the update command:$ bin/pulsar-admin functions update \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--tenant public \
--namespace default \
--name exclamation \
--parallelism 3

You should see Updated successfully in the output. If you run the get command from above for the function, you can see that the parallelism has increased to 3, meaning that there are now three instances of the function running in our cluster:

{
"tenant": "public",
"namespace": "default",
"name": "exclamation",
"className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"output": "persistent://public/default/exclamation-output",
"autoAck": true,
"inputs": [
"persistent://public/default/exclamation-input"
],
"parallelism": 3
}

Finally, we can shut down our running function using the delete command:

$bin/pulsar-admin functions delete \ --tenant public \ --namespace default \ --name exclamation If you see Deleted successfully in the output, then you've successfully run, updated, and shut down a Pulsar Function running in cluster mode. Congrats! Now, let's go even further and run a brand new function in the next section. Writing and running a new function​ In order to write and run the Python function below, you'll need to install a few dependencies:$ pip install pulsar-client

In the above examples, we ran and managed a pre-written Pulsar Function and saw how it worked. To really get our hands dirty, let's write and our own function from scratch, using the Python API. This simple function will also take a string as input but it will reverse the string and publish the resulting, reversed string to the specified topic.

First, create a new Python file:

$touch reverse.py In that file, add the following: def process(input): return input[::-1] Here, the process method defines the processing logic of the Pulsar Function. It simply uses some Python slice magic to reverse each incoming string. Now, we can deploy the function using create:$ bin/pulsar-admin functions create \
--py reverse.py \
--classname reverse \
--inputs persistent://public/default/backwards \
--output persistent://public/default/forwards \
--tenant public \
--namespace default \
--name reverse

If you see Created successfully, the function is ready to accept incoming messages. Because the function is running in cluster mode, we can trigger the function using the trigger command. This command will send a message that we specify to the function and also give us the function's output. Here's an example:

--name reverse \
--tenant public \
--namespace default \
--trigger-value "sdrawrof won si tub sdrawkcab saw gnirts sihT"

You should get this output:

This string was backwards but is now forwards

Once again, success! We created a brand new Pulsar Function, deployed it in our Pulsar standalone cluster in cluster mode and successfully triggered the function. If you're ready for more, check out one of these docs:

Packaging python dependencies​

For python functions requiring dependencies to be deployable in pulsar worker instances in an offline manner, we need to package the artifacts as below.

Client Requirements​

Following programs are required to be installed on the client machine

pip \\ required for getting python dependencies
zip \\ for building zip archives

Python Dependencies​

A file named requirements.txt is needed with required dependencies for the python function

sh==1.12.14

Prepare the pulsar function in folder called src.

Run the following command to gather the python dependencies in the folder caller deps

--only-binary :all: \
--platform manylinux1_x86_64 \
--python-version 27 \
--implementation cp \
--abi cp27m -r requirements.txt -d deps

Sample output

Collecting sh==1.12.14 (from -r requirements.txt (line 1))
Using cached https://files.pythonhosted.org/packages/4a/22/17b22ef5b049f12080f5815c41bf94de3c229217609e469001a8f80c1b3d/sh-1.12.14-py2.py3-none-any.whl
Saved ./deps/sh-1.12.14-py2.py3-none-any.whl

Note pulsar-client is not needed as a dependency as it already installed in the worker node.

Packaging​

Create a destination folder with the desired pacaking name eg : exclamation, copy src and deps folder into it and finally compress the folder into a zip archive.

Sample sequence

cp -R deps exclamation/
cp -R src exclamation/

ls -la exclamation/
total 7
drwxr-xr-x 5 a.ahmed staff 160 Nov 6 17:51 .
drwxr-xr-x 12 a.ahmed staff 384 Nov 6 17:52 ..
drwxr-xr-x 3 a.ahmed staff 96 Nov 6 17:51 deps
drwxr-xr-x 3 a.ahmed staff 96 Nov 6 17:51 src

zip -r exclamation.zip exclamation

Archive exclamation.zip can we deployed as function into a pulsar worker, the worker does not need internet connectivity to download packages as they are all included in the zip file.