Skip to main content

Get started with Pulsar Functions

This tutorial walks you through running a standalone Pulsar cluster on your machine, and then running your first Pulsar Function using that cluster. The first Pulsar Function runs in local run mode (outside your Pulsar cluster), while the second runs in cluster mode (inside your cluster).

In local run mode, Pulsar Functions communicate with Pulsar cluster, but run outside of the cluster.

Prerequisites

Install Maven on your machine.

Run a standalone Pulsar cluster

In order to run Pulsar Functions, you need to run a Pulsar cluster locally first. The easiest way is to run Pulsar in standalone mode. Follow these steps to start up a standalone cluster.


$ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz
$ tar xvfz apache-pulsar-2.4.0-bin.tar.gz
$ cd apache-pulsar-2.4.0
$ bin/pulsar standalone \
--advertised-address 127.0.0.1

When running Pulsar in standalone mode, the public tenant and the default namespace are created automatically. The tenant and namespace are used throughout this tutorial.

Run a Pulsar Function in local run mode

You can start with a simple function that takes a string as input from a Pulsar topic, adds an exclamation point to the end of the string, and then publishes the new string to another Pulsar topic. The following is the code for the function.


package org.apache.pulsar.functions.api.examples;

import java.util.function.Function;

public class ExclamationFunction implements Function<String, String> {
@Override
public String apply(String input) {
return String.format("%s!", input);
}
}

A JAR file containing this function and several other functions (written in Java) is included with the binary distribution you have downloaded (in the examples folder). Run the function in local mode on your laptop but outside your Pulsar cluster with the following commands.


$ bin/pulsar-admin functions localrun \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--name exclamation

Multiple input topics

In the example above, a single topic is specified using the --inputs flag. You can also specify multiple input topics with a comma-separated list using the same flag.


--inputs topic1,topic2


You can open up another shell and use the pulsar-client tool to listen for messages on the output topic.


$ bin/pulsar-client consume persistent://public/default/exclamation-output \
--subscription-name my-subscription \
--num-messages 0

Setting the --num-messages flag to 0 means that consumers listen on the topic indefinitely, rather than only accepting a certain number of messages.

With a listener up and running, you can open up another shell and produce a message on the input topic that you specify.


$ bin/pulsar-client produce persistent://public/default/exclamation-input \
--num-produce 1 \
--messages "Hello world"

When the message has been successfully processed by the exclamation function, you will see the following output. To shut down the function, press Ctrl+C.


----- got message -----
Hello world!

Process explanation

  • The Hello world message you publish to the input topic (persistent://public/default/exclamation-input) is passed to the exclamation function.
  • The exclamation function processes the message (providing a result of Hello world!) and publishes the result to the output topic (persistent://public/default/exclamation-output).
  • If the exclamation function does not run, Pulsar will durably store the message data published to the input topic in Apache BookKeeper until a consumer consumes and acknowledges the message.

Run a Pulsar Function in cluster mode

Local run mode is useful for development and test. However, when you use Pulsar for real deployment, you run it in cluster mode. In cluster mode, Pulsar Functions run inside of your Pulsar cluster and are managed using the same pulsar-admin functions interface.

The following command deploys the same exclamation function you run locally in your Pulsar cluster, rather than outside of it.


$ bin/pulsar-admin functions create \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--name exclamation

You will see Created successfully in the output. Check the list of functions running in your cluster.


$ bin/pulsar-admin functions list \
--tenant public \
--namespace default

You will see the exclamation function. Check the status of your deployed function using the getstatus command.


$ bin/pulsar-admin functions getstatus \
--tenant public \
--namespace default \
--name exclamation

You will see the following JSON output.


{
"functionStatusList": [
{
"running": true,
"instanceId": "0"
}
]
}

As you can see, the instance is currently running, and an instance with the ID of 0 is running. With the get command, you can get other information about the function, for example, topics, tenant, namespace, and so on.


$ bin/pulsar-admin functions get \
--tenant public \
--namespace default \
--name exclamation

You will see the following JSON output.


{
"tenant": "public",
"namespace": "default",
"name": "exclamation",
"className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"output": "persistent://public/default/exclamation-output",
"autoAck": true,
"inputs": [
"persistent://public/default/exclamation-input"
],
"parallelism": 1
}

As you can see, only one instance of the function is running in your cluster. Update the parallel functions to 3 using the update command.


$ bin/pulsar-admin functions update \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--tenant public \
--namespace default \
--name exclamation \
--parallelism 3

You will see Updated successfully in the output. If you enter the get command, you see that the parallel functions are increased to 3, meaning that three instances of the function are running in your cluster.


{
"tenant": "public",
"namespace": "default",
"name": "exclamation",
"className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"output": "persistent://public/default/exclamation-output",
"autoAck": true,
"inputs": [
"persistent://public/default/exclamation-input"
],
"parallelism": 3
}

Shut down the running function with the delete command.


$ bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name exclamation

When you see Deleted successfully in the output, you've successfully run, updated, and shut down functions running in cluster mode.

Write and run a new function

In order to write and run Python functions, you need to install some dependencies.


$ pip install pulsar-client

In the examples above, you run and manage pre-written Pulsar Functions and learn how they work. You can also write your own functions with Python API. In the following example, the function takes a string as input, reverses the string, and publishes the reversed string to the specified topic.

First, create a new Python file.


$ touch reverse.py

Add the following information in the Python file.


def process(input):
return input[::-1]

The process method defines the processing logic of Pulsar Functions. It uses Python slice magic to reverse each incoming string. You can deploy the function using the create command.


$ bin/pulsar-admin functions create \
--py reverse.py \
--classname reverse \
--inputs persistent://public/default/backwards \
--output persistent://public/default/forwards \
--tenant public \
--namespace default \
--name reverse

If you see Created successfully, the function is ready to accept incoming messages. Because the function is running in cluster mode, you can trigger the function using the trigger command. This command sends a message that you specify to the function and returns the function output. The following is an example.


$ bin/pulsar-admin functions trigger \
--name reverse \
--tenant public \
--namespace default \
--trigger-value "sdrawrof won si tub sdrawkcab saw gnirts sihT"

You will get the following output.


This string was backwards but is now forwards

You have created a new Pulsar Function, deployed it in your Pulsar standalone cluster in cluster mode, and triggered the Function.

Write and run a Go function

Go function depends on pulsar-client-go. Make sure that you have built pulsar-client-go before using Go function.

To write and run a Go function, complete the following steps.

  1. Create a new Go file.

touch helloFunc.go

  1. Append a byte for messages from the input topic. The following is a helloFunc.go example. Each message from the input topic is appended with a 110 byte, and then delivered to the output topic.

package main

import (
"context"

"github.com/apache/pulsar/pulsar-function-go/pf"
)

func HandleResponse(ctx context.Context, in []byte) ([]byte, error) {
res := append(in, 110)
return res, nil
}

func main() {
pf.Start(HandleResponse)
}

  1. Compile code.

go build -o examplepulsar helloFunc.go

  1. Run Go function.

$ bin/pulsar-admin functions create \
--go examplepulsar \
--inputs persistent://public/default/backwards \
--output persistent://public/default/forwards \
--tenant public \
--namespace default \
--name gofunc

If you see Created successfully, the function is ready to accept incoming messages. Start a producer and produce messages to the backwards input topic. Start a consumer and consume messages from the forwards output topic, you will see 110 is appended to all messages.

The --classname parameter is not specified when running Go function, because there is no Class concept in Go, which is different from Java and Python.

note

When you use the --go command to specify an executable file, make sure you have executable permissions.

Package Python dependencies

When you deploy Python functions in a cluster offline, you need to package the required dependencies in a ZIP file before deployment.

Client requirements

The following programs are required to be installed on the client machine.


pip \\ required for getting python dependencies
zip \\ for building zip archives

Python dependencies

A file named requirements.txt is needed with required dependencies for the Python function.


sh==1.12.14

Prepare the Pulsar Function in the src folder.

Run the following command to gather Python dependencies in the deps folder.


pip download \
--only-binary :all: \
--platform manylinux1_x86_64 \
--python-version 27 \
--implementation cp \
--abi cp27m -r requirements.txt -d deps

Sample output


Collecting sh==1.12.14 (from -r requirements.txt (line 1))
Using cached https://files.pythonhosted.org/packages/4a/22/17b22ef5b049f12080f5815c41bf94de3c229217609e469001a8f80c1b3d/sh-1.12.14-py2.py3-none-any.whl
Saved ./deps/sh-1.12.14-py2.py3-none-any.whl
Successfully downloaded sh

note

pulsar-client is not needed as a dependency as it has already installed in the worker node.

Package

Create a destination folder with the desired package name, for example, exclamation. Copy the src and deps folders into it, and compress the folder into a ZIP archive.

Sample sequence


cp -R deps exclamation/
cp -R src exclamation/

ls -la exclamation/
total 7
drwxr-xr-x 5 a.ahmed staff 160 Nov 6 17:51 .
drwxr-xr-x 12 a.ahmed staff 384 Nov 6 17:52 ..
drwxr-xr-x 3 a.ahmed staff 96 Nov 6 17:51 deps
drwxr-xr-x 3 a.ahmed staff 96 Nov 6 17:51 src

zip -r exclamation.zip exclamation

After package all the required dependencies into the exclamation.zip file, you can deploy functions in a Pulsar worker. The Pulsar worker does not need internet connectivity to download packages, because they are all included in the ZIP file.