Skip to main content

Configure Functions runtime

You can use the following methods to run functions.

  • Thread: Invoke functions threads in functions worker.
  • Process: Invoke functions in processes forked by functions worker.
  • Kubernetes: Submit functions as Kubernetes StatefulSets by functions worker.
note

Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.

The differences of the thread and process modes are:

  • Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with functions worker.
  • Process mode: when a function runs in process mode, it runs on the same machine that functions worker runs.

Configure thread runtime

It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:


functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
functionRuntimeFactoryConfigs:
threadGroupName: "Your Function Container Group"

Thread runtime is only supported in Java function.

Configure process runtime

When you enable Process runtime, you do not need to configure anything.


functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
functionRuntimeFactoryConfigs:
# the directory for storing the function logs
logDirectory:
# change the jar location only when you put the java instance jar in a different location
javaInstanceJarLocation:
# change the python instance location only when you put the python instance jar in a different location
pythonInstanceLocation:
# change the extra dependencies location:
extraFunctionDependenciesDir:

Process runtime is supported in Java, Python, and Go functions.

Configure Kubernetes runtime

When the functions worker generates Kubernetes manifests and apply the manifests, the Kubernetes runtime works. If you have run functions worker on Kubernetes, you can use the serviceAccount associated with the pod that the functions worker is running in. Otherwise, you can configure it to communicate with a Kubernetes cluster.

The manifests, generated by the functions worker, include a StatefulSet, a Service (used to communicate with the pods), and a Secret for auth credentials (when applicable). The StatefulSet manifest (by default) has a single pod, with the number of replicas determined by the "parallelism" of the function. On pod boot, the pod downloads the function payload (via the functions worker REST API). The pod's container image is configurable, but must have the functions runtime.

The Kubernetes runtime supports secrets, so you can create a Kubernetes secret and expose it as an environment variable in the pod. The Kubernetes runtime is extensible, you can implement classes and customize the way how to generate Kubernetes manifests, how to pass auth data to pods, and how to integrate secrets.

tip

For the rules of translating Pulsar object names into Kubernetes resource labels, see here.

Basic configuration

It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory in the functions_worker.yaml file. The following is an example.


functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
functionRuntimeFactoryConfigs:
# uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in function worker
k8Uri:
# the kubernetes namespace to run the function instances. it is `default`, if this setting is left to be empty
jobNamespace:
# The Kubernetes pod name to run the function instances. It is set to
# `pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this setting is left to be empty
jobName:
# the docker image to run function instance. by default it is `apachepulsar/pulsar`
pulsarDockerImageName:
# the docker image to run function instance according to different configurations provided by users.
# By default it is `apachepulsar/pulsar`.
# e.g:
# functionDockerImages:
# JAVA: JAVA_IMAGE_NAME
# PYTHON: PYTHON_IMAGE_NAME
# GO: GO_IMAGE_NAME
functionDockerImages:
# "The image pull policy for image used to run function instance. By default it is `IfNotPresent`
imagePullPolicy: IfNotPresent
# the root directory of pulsar home directory in `pulsarDockerImageName`. by default it is `/pulsar`.
# if you are using your own built image in `pulsarDockerImageName`, you need to set this setting accordingly
pulsarRootDir:
# The config admin CLI allows users to customize the configuration of the admin cli tool, such as:
# `/bin/pulsar-admin and /bin/pulsarctl`. By default it is `/bin/pulsar-admin`. If you want to use `pulsarctl`
# you need to set this setting accordingly
configAdminCLI:
# this setting only takes effects if `k8Uri` is set to null. if your function worker is running as a k8 pod,
# setting this to true is let function worker to submit functions to the same k8s cluster as function worker
# is running. setting this to false if your function worker is not running as a k8 pod.
submittingInsidePod: false
# setting the pulsar service url that pulsar function should use to connect to pulsar
# if it is not set, it will use the pulsar service url configured in worker service
pulsarServiceUrl:
# setting the pulsar admin url that pulsar function should use to connect to pulsar
# if it is not set, it will use the pulsar admin url configured in worker service
pulsarAdminUrl:
# The flag indicates to install user code dependencies. (applied to python package)
installUserCodeDependencies:
# The repository that pulsar functions use to download python dependencies
pythonDependencyRepository:
# The repository that pulsar functions use to download extra python dependencies
pythonExtraDependencyRepository:
# the custom labels that function worker uses to select the nodes for pods
customLabels:
# The expected metrics collection interval, in seconds
expectedMetricsCollectionInterval: 30
# Kubernetes Runtime will periodically checkback on
# this configMap if defined and if there are any changes
# to the kubernetes specific stuff, we apply those changes
changeConfigMap:
# The namespace for storing change config map
changeConfigMapNamespace:
# The ratio cpu request and cpu limit to be set for a function/source/sink.
# The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio
cpuOverCommitRatio: 1.0
# The ratio memory request and memory limit to be set for a function/source/sink.
# The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio
memoryOverCommitRatio: 1.0
# The port inside the function pod which is used by the worker to communicate with the pod
grpcPort: 9093
# The port inside the function pod on which prometheus metrics are exposed
metricsPort: 9094
# The directory inside the function pod where nar packages will be extracted
narExtractionDirectory:
# The classpath where function instance files stored
functionInstanceClassPath:
# Upload the builtin sources/sinks to BookKeeper.
# True by default.
uploadBuiltinSinksSources: true
# the directory for dropping extra function dependencies
# if it is not an absolute path, it is relative to `pulsarRootDir`
extraFunctionDependenciesDir:
# Additional memory padding added on top of the memory requested by the function per on a per instance basis
percentMemoryPadding: 10
# The duration (in seconds) before the StatefulSet is deleted after a function stops or restarts.
# Value must be a non-negative integer. 0 indicates the StatefulSet is deleted immediately.
# Default is 5 seconds.
gracePeriodSeconds: 5

If you run functions worker embedded in a broker on Kubernetes, you can use the default settings.

Run standalone functions worker on Kubernetes

If you run functions worker standalone (that is, not embedded) on Kubernetes, you need to configure pulsarServiceUrl to be the URL of the broker and pulsarAdminUrl as the URL to the functions worker.

For example, both Pulsar brokers and Function Workers run in the pulsar K8S namespace. The brokers have a service called brokers and the functions worker has a service called func-worker. The settings are as follows:


pulsarServiceUrl: pulsar://broker.pulsar:6650 // or pulsar+ssl://broker.pulsar:6651 if using TLS
pulsarAdminUrl: http://func-worker.pulsar:8080 // or https://func-worker:8443 if using TLS

Run RBAC in Kubernetes clusters

If you run RBAC in your Kubernetes cluster, make sure that the service account you use for running functions workers (or brokers, if functions workers run along with brokers) have permissions on the following Kubernetes APIs.

  • services
  • configmaps
  • pods
  • apps.statefulsets

The following is sufficient:


apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: functions-worker
rules:
- apiGroups: [""]
resources:
- services
- configmaps
- pods
verbs:
- '*'
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- '*'
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: functions-worker
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: functions-worker
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: functions-worker
subjectsKubernetesSec:
- kind: ServiceAccount
name: functions-worker

If the service-account is not properly configured, an error message similar to this is displayed:


22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
io.kubernetes.client.ApiException: Forbidden
at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_212]

Integrate Kubernetes secrets

In order to safely distribute secrets, Pulsar Functions can reference Kubernetes secrets. To enable this, set the secretsProviderConfiguratorClassName to org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator.

You can create a secret in the namespace where your functions are deployed. For example, you deploy functions to the pulsar-func Kubernetes namespace, and you have a secret named database-creds with a field name password, which you want to mount in the pod as an environment variable called DATABASE_PASSWORD. The following functions configuration enables you to reference that secret and mount the value as an environment variable in the pod.


tenant: "mytenant"
namespace: "mynamespace"
name: "myfunction"
topicName: "persistent://mytenant/mynamespace/myfuncinput"
className: "com.company.pulsar.myfunction"

secrets:
# the secret will be mounted from the `password` field in the `database-creds` secret as an env var called `DATABASE_PASSWORD`
DATABASE_PASSWORD:
path: "database-creds"
key: "password"

Enable token authentication

When you enable authentication for your Pulsar cluster, you need a mechanism for the pod running your function to authenticate with the broker.

The org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider interface provides support for any authentication mechanism. The functionAuthProviderClassName in function-worker.yml is used to specify your path to this implementation.

Pulsar includes an implementation of this interface for token authentication, and distributes the certificate authority via the same implementation. The configuration is similar as follows:


functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider

For token authentication, the functions worker captures the token that is used to deploy (or update) the function. The token is saved as a secret and mounted into the pod.

For custom authentication or TLS, you need to implement this interface or use an alternative mechanism to provide authentication. If you use token authentication and TLS encryption to secure the communication with the cluster, Pulsar passes your certificate authority (CA) to the client, so the client obtains what it needs to authenticate the cluster, and trusts the cluster with your signed certificate.

note

If you use tokens that expire when deploying functions, these tokens will expire.

Run clusters with authentication

When you run a functions worker in a standalone process (that is, not embedded in the broker) in a cluster with authentication, you must configure your functions worker to interact with the broker and authenticate incoming requests. So you need to configure properties that the broker requires for authentication or authorization.

For example, if you use token authentication, you need to configure the following properties in the function-worker.yml file.


clientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken
clientAuthenticationParameters: file:///etc/pulsar/token/admin-token.txt
configurationMetadataStoreUrl: zk:zookeeper-cluster:2181 # auth requires a connection to zookeeper
authenticationProviders:
- "org.apache.pulsar.broker.authentication.AuthenticationProviderToken"
authorizationEnabled: true
authenticationEnabled: true
superUserRoles:
- superuser
- proxy
properties:
tokenSecretKey: file:///etc/pulsar/jwt/secret # if using a secret token, key file must be DER-encoded
tokenPublicKey: file:///etc/pulsar/jwt/public.key # if using public/private key tokens, key file must be DER-encoded

note

You must configure both the Function Worker authorization or authentication for the server to authenticate requests and configure the client to be authenticated to communicate with the broker.

Customize Kubernetes runtime

The Kubernetes integration enables you to implement a class and customize how to generate manifests. You can configure it by setting runtimeCustomizerClassName in the functions-worker.yml file and use the fully qualified class name. You must implement the org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer interface.

The functions (and sinks/sources) API provides a flag, customRuntimeOptions, which is passed to this interface.

To initialize the KubernetesManifestCustomizer, you can provide runtimeCustomizerConfig in the functions-worker.yml file. runtimeCustomizerConfig is passed to the public void initialize(Map<String, Object> config) function of the interface. runtimeCustomizerConfigis different from the customRuntimeOptions as runtimeCustomizerConfig is the same across all functions. If you provide both runtimeCustomizerConfig and customRuntimeOptions, you need to decide how to manage these two configurations in your implementation of KubernetesManifestCustomizer.

Pulsar includes a built-in implementation. To use the basic implementation, set runtimeCustomizerClassName to org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer. The built-in implementation initialized with runtimeCustomizerConfig enables you to pass a JSON document as customRuntimeOptions with certain properties to augment, which decides how the manifests are generated. If both runtimeCustomizerConfig and customRuntimeOptions are provided, BasicKubernetesManifestCustomizer uses customRuntimeOptions to override the configuration if there are conflicts in these two configurations.

Below is an example of customRuntimeOptions.


{
"jobName": "jobname", // the k8s pod name to run this function instance
"jobNamespace": "namespace", // the k8s namespace to run this function in
"extractLabels": { // extra labels to attach to the statefulSet, service, and pods
"extraLabel": "value"
},
"extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
"extraAnnotation": "value"
},
"nodeSelectorLabels": { // node selector labels to add on to the pod spec
"customLabel": "value"
},
"tolerations": [ // tolerations to add to the pod spec
{
"key": "custom-key",
"value": "value",
"effect": "NoSchedule"
}
],
"resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
"requests": {
"cpu": 1,
"memory": "4G"
},
"limits": {
"cpu": 2,
"memory": "8G"
}
}
}

Run clusters with geo-replication

If you run multiple clusters tied together with geo-replication, it is important to use a different function namespace for each cluster. Otherwise, the function shares a namespace and potentially schedule across clusters.

For example, if you have two clusters: east-1 and west-1, you can configure the functions workers for east-1 and west-1 perspectively as follows.


pulsarFunctionsCluster: east-1
pulsarFunctionsNamespace: public/functions-east-1


pulsarFunctionsCluster: west-1
pulsarFunctionsNamespace: public/functions-west-1

This ensures the two different Functions Workers use distinct sets of topics for their internal coordination.

Configure standalone functions worker

When configuring a standalone functions worker, you need to configure properties that the broker requires, especially if you use TLS. And then Functions Worker can communicate with the broker.

You need to configure the following required properties.


workerPort: 8080
workerPortTls: 8443 # when using TLS
tlsCertificateFilePath: /etc/pulsar/tls/tls.crt # when using TLS
tlsKeyFilePath: /etc/pulsar/tls/tls.key # when using TLS
tlsTrustCertsFilePath: /etc/pulsar/tls/ca.crt # when using TLS
pulsarServiceUrl: pulsar://broker.pulsar:6650/ # or pulsar+ssl://pulsar-prod-broker.pulsar:6651/ when using TLS
pulsarWebServiceUrl: http://broker.pulsar:8080/ # or https://pulsar-prod-broker.pulsar:8443/ when using TLS
useTls: true # when using TLS, critical!