Configure Functions runtime
You can use the following methods to run functions.
- Thread: Invoke functions threads in functions worker.
- Process: Invoke functions in processes forked by functions worker.
- Kubernetes: Submit functions as Kubernetes StatefulSets by functions worker.
Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
The differences of the thread and process modes are:
- Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with functions worker.
- Process mode: when a function runs in process mode, it runs on the same machine that functions worker runs.
Configure thread runtime
It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
functionRuntimeFactoryConfigs:
  threadGroupName: "Your Function Container Group"
Thread runtime is only supported in Java function.
Configure process runtime
When you enable Process runtime, you do not need to configure anything.
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
functionRuntimeFactoryConfigs:
  # the directory for storing the function logs
  logDirectory:
  # change the jar location only when you put the java instance jar in a different location
  javaInstanceJarLocation:
  # change the python instance location only when you put the python instance jar in a different location
  pythonInstanceLocation:
  # change the extra dependencies location:
  extraFunctionDependenciesDir:
Process runtime is supported in Java, Python, and Go functions.
Configure Kubernetes runtime
When the functions worker generates Kubernetes manifests and apply the manifests, the Kubernetes runtime works. If you have run functions worker on Kubernetes, you can use the serviceAccount associated with the pod that the functions worker is running in. Otherwise, you can configure it to communicate with a Kubernetes cluster.
The manifests, generated by the functions worker, include a StatefulSet, a Service (used to communicate with the pods), and a Secret for auth credentials (when applicable). The StatefulSet manifest (by default) has a single pod, with the number of replicas determined by the "parallelism" of the function. On pod boot, the pod downloads the function payload (via the functions worker REST API). The pod's container image is configurable, but must have the functions runtime.
The Kubernetes runtime supports secrets, so you can create a Kubernetes secret and expose it as an environment variable in the pod. The Kubernetes runtime is extensible, you can implement classes and customize the way how to generate Kubernetes manifests, how to pass auth data to pods, and how to integrate secrets.
For the rules of translating Pulsar object names into Kubernetes resource labels, see here.
Basic configuration
It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory in the functions_worker.yaml file. The following is an example.
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
functionRuntimeFactoryConfigs:
  # uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in function worker
  k8Uri:
  # the kubernetes namespace to run the function instances. it is `default`, if this setting is left to be empty
  jobNamespace:
  # The Kubernetes pod name to run the function instances. It is set to
  # `pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this setting is left to be empty
  jobName:
  # the docker image to run function instance. by default it is `apachepulsar/pulsar`
  pulsarDockerImageName:
  # the docker image to run function instance according to different configurations provided by users.
  # By default it is `apachepulsar/pulsar`.
  # e.g:
  # functionDockerImages:
  #   JAVA: JAVA_IMAGE_NAME
  #   PYTHON: PYTHON_IMAGE_NAME
  #   GO: GO_IMAGE_NAME
  functionDockerImages:
  # "The image pull policy for image used to run function instance. By default it is `IfNotPresent`
  imagePullPolicy: IfNotPresent
  # the root directory of pulsar home directory in `pulsarDockerImageName`. by default it is `/pulsar`.
  # if you are using your own built image in `pulsarDockerImageName`, you need to set this setting accordingly
  pulsarRootDir:
  # The config admin CLI allows users to customize the configuration of the admin cli tool, such as:
  # `/bin/pulsar-admin and /bin/pulsarctl`. By default it is `/bin/pulsar-admin`. If you want to use `pulsarctl`
  # you need to set this setting accordingly
  configAdminCLI:
  # this setting only takes effects if `k8Uri` is set to null. if your function worker is running as a k8 pod,
  # setting this to true is let function worker to submit functions to the same k8s cluster as function worker
  # is running. setting this to false if your function worker is not running as a k8 pod.
  submittingInsidePod: false
  # setting the pulsar service url that pulsar function should use to connect to pulsar
  # if it is not set, it will use the pulsar service url configured in worker service
  pulsarServiceUrl:
  # setting the pulsar admin url that pulsar function should use to connect to pulsar
  # if it is not set, it will use the pulsar admin url configured in worker service
  pulsarAdminUrl:
  # The flag indicates to install user code dependencies. (applied to python package)
  installUserCodeDependencies:
  # The repository that pulsar functions use to download python dependencies
  pythonDependencyRepository:
  # The repository that pulsar functions use to download extra python dependencies
  pythonExtraDependencyRepository:
  # the custom labels that function worker uses to select the nodes for pods
  customLabels:
  # The expected metrics collection interval, in seconds
  expectedMetricsCollectionInterval: 30
  # Kubernetes Runtime will periodically checkback on
  # this configMap if defined and if there are any changes
  # to the kubernetes specific stuff, we apply those changes
  changeConfigMap:
  # The namespace for storing change config map
  changeConfigMapNamespace:
  # The ratio cpu request and cpu limit to be set for a function/source/sink.
  # The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio
  cpuOverCommitRatio: 1.0
  # The ratio memory request and memory limit to be set for a function/source/sink.
  # The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio
  memoryOverCommitRatio: 1.0
  # The port inside the function pod which is used by the worker to communicate with the pod
  grpcPort: 9093
  # The port inside the function pod on which prometheus metrics are exposed
  metricsPort: 9094
  # The directory inside the function pod where nar packages will be extracted
  narExtractionDirectory:
  # The classpath where function instance files stored
  functionInstanceClassPath:
  # the directory for dropping extra function dependencies
  # if it is not an absolute path, it is relative to `pulsarRootDir`
  extraFunctionDependenciesDir:
  # Additional memory padding added on top of the memory requested by the function per on a per instance basis
  percentMemoryPadding: 10
  # The duration (in seconds) before the StatefulSet is deleted after a function stops or restarts.
  # Value must be a non-negative integer. 0 indicates the StatefulSet is deleted immediately.
  # Default is 5 seconds.
  gracePeriodSeconds: 5
gracePeriodSeconds is only available in 2.8.2 and later versions.
If you run functions worker embedded in a broker on Kubernetes, you can use the default settings.
Run standalone functions worker on Kubernetes
If you run functions worker standalone (that is, not embedded) on Kubernetes, you need to configure pulsarSerivceUrl to be the URL of the broker and pulsarAdminUrl as the URL to the functions worker.
For example, both Pulsar brokers and Function Workers run in the pulsar K8S namespace. The brokers have a service called brokers and the functions worker has a service called func-worker. The settings are as follows:
pulsarServiceUrl: pulsar://broker.pulsar:6650 // or pulsar+ssl://broker.pulsar:6651 if using TLS
pulsarAdminUrl: http://func-worker.pulsar:8080 // or https://func-worker:8443 if using TLS
Run RBAC in Kubernetes clusters
If you run RBAC in your Kubernetes cluster, make sure that the service account you use for running functions workers (or brokers, if functions workers run along with brokers) have permissions on the following Kubernetes APIs.
- services
- configmaps
- pods
- apps.statefulsets
The following is sufficient:
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: functions-worker
rules:
- apiGroups: [""]
  resources:
  - services
  - configmaps
  - pods
  verbs:
  - '*'
- apiGroups:
  - apps
  resources:
  - statefulsets
  verbs:
  - '*'
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: functions-worker
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: functions-worker
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: functions-worker
subjectsKubernetesSec:
- kind: ServiceAccount
  name: functions-worker
If the service-account is not properly configured, an error message similar to this is displayed:
22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
io.kubernetes.client.ApiException: Forbidden
	at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
	at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
	at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
	at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
	at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
	at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
	at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
	at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_212]
Integrate Kubernetes secrets
In order to safely distribute secrets, Pulsar Functions can reference Kubernetes secrets. To enable this, set the secretsProviderConfiguratorClassName to org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator.
You can create a secret in the namespace where your functions are deployed. For example, you deploy functions to the pulsar-func Kubernetes namespace, and you have a secret named database-creds with a field name password, which you want to mount in the pod as an environment variable called DATABASE_PASSWORD. The following functions configuration enables you to reference that secret and mount the value as an environment variable in the pod.
tenant: "mytenant"
namespace: "mynamespace"
name: "myfunction"
topicName: "persistent://mytenant/mynamespace/myfuncinput"
className: "com.company.pulsar.myfunction"
secrets:
  # the secret will be mounted from the `password` field in the `database-creds` secret as an env var called `DATABASE_PASSWORD`
  DATABASE_PASSWORD:
    path: "database-creds"
    key: "password"
Enable token authentication
When you enable authentication for your Pulsar cluster, you need a mechanism for the pod running your function to authenticate with the broker.
The org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider interface provides support for any authentication mechanism. The functionAuthProviderClassName in function-worker.yml is used to specify your path to this implementation.
Pulsar includes an implementation of this interface for token authentication, and distributes the certificate authority via the same implementation. The configuration is similar as follows:
functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider
For token authentication, the functions worker captures the token that is used to deploy (or update) the function. The token is saved as a secret and mounted into the pod.
For custom authentication or TLS, you need to implement this interface or use an alternative mechanism to provide authentication. If you use token authentication and TLS encryption to secure the communication with the cluster, Pulsar passes your certificate authority (CA) to the client, so the client obtains what it needs to authenticate the cluster, and trusts the cluster with your signed certificate.
If you use tokens that expire when deploying functions, these tokens will expire.
Run clusters with authentication
When you run a functions worker in a standalone process (that is, not embedded in the broker) in a cluster with authentication, you must configure your functions worker to interact with the broker and authenticate incoming requests. So you need to configure properties that the broker requires for authentication or authorization.
For example, if you use token authentication, you need to configure the following properties in the function-worker.yml file.
clientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken
clientAuthenticationParameters: file:///etc/pulsar/token/admin-token.txt
configurationStoreServers: zookeeper-cluster:2181 # auth requires a connection to zookeeper
authenticationProviders:
 - "org.apache.pulsar.broker.authentication.AuthenticationProviderToken"
authorizationEnabled: true
authenticationEnabled: true
superUserRoles:
  - superuser
  - proxy
properties:
  tokenSecretKey: file:///etc/pulsar/jwt/secret # if using a secret token, key file must be DER-encoded
  tokenPublicKey: file:///etc/pulsar/jwt/public.key # if using public/private key tokens, key file must be DER-encoded
You must configure both the Function Worker authorization or authentication for the server to authenticate requests and configure the client to be authenticated to communicate with the broker.
Customize Kubernetes runtime
The Kubernetes integration enables you to implement a class and customize how to generate manifests. You can configure it by setting runtimeCustomizerClassName in the functions-worker.yml file and use the fully qualified class name. You must implement the org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer interface.
The functions (and sinks/sources) API provides a flag, customRuntimeOptions, which is passed to this interface.
To initialize the KubernetesManifestCustomizer, you can provide runtimeCustomizerConfig in the functions-worker.yml file. runtimeCustomizerConfig is passed to the public void initialize(Map<String, Object> config) function of the interface. runtimeCustomizerConfigis different from the customRuntimeOptions as runtimeCustomizerConfig is the same across all functions. If you provide both runtimeCustomizerConfig  and customRuntimeOptions, you need to decide how to manage these two configurations in your implementation of KubernetesManifestCustomizer.
Pulsar includes a built-in implementation. To use the basic implementation, set runtimeCustomizerClassName to org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer. The built-in implementation initialized with runtimeCustomizerConfig enables you to pass a JSON document as customRuntimeOptions with certain properties to augment, which decides how the manifests are generated. If both runtimeCustomizerConfig and customRuntimeOptions are provided, BasicKubernetesManifestCustomizer uses customRuntimeOptions to override the configuration if there are conflicts in these two configurations.
Below is an example of customRuntimeOptions.
{
  "jobName": "jobname", // the k8s pod name to run this function instance
  "jobNamespace": "namespace", // the k8s namespace to run this function in
  "extractLabels": {           // extra labels to attach to the statefulSet, service, and pods
    "extraLabel": "value"
  },
  "extraAnnotations": {        // extra annotations to attach to the statefulSet, service, and pods
    "extraAnnotation": "value"
  },
  "nodeSelectorLabels": {      // node selector labels to add on to the pod spec
    "customLabel": "value"
  },
  "tolerations": [             // tolerations to add to the pod spec
    {
      "key": "custom-key",
      "value": "value",
      "effect": "NoSchedule"
    }
  ],
  "resourceRequirements": {  // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
    "requests": {
      "cpu": 1,
      "memory": "4G"
    },
    "limits": {
      "cpu": 2,
      "memory": "8G"
    }
  }
}
Run clusters with geo-replication
If you run multiple clusters tied together with geo-replication, it is important to use a different function namespace for each cluster. Otherwise, the function shares a namespace and potentially schedule across clusters.
For example, if you have two clusters: east-1 and west-1, you can configure the functions workers for east-1 and west-1 perspectively as follows.
pulsarFunctionsCluster: east-1
pulsarFunctionsNamespace: public/functions-east-1
pulsarFunctionsCluster: west-1
pulsarFunctionsNamespace: public/functions-west-1
This ensures the two different Functions Workers use distinct sets of topics for their internal coordination.
Configure standalone functions worker
When configuring a standalone functions worker, you need to configure properties that the broker requires, especially if you use TLS. And then Functions Worker can communicate with the broker.
You need to configure the following required properties.
workerPort: 8080
workerPortTls: 8443 # when using TLS
tlsCertificateFilePath: /etc/pulsar/tls/tls.crt # when using TLS
tlsKeyFilePath: /etc/pulsar/tls/tls.key # when using TLS
tlsTrustCertsFilePath: /etc/pulsar/tls/ca.crt # when using TLS
pulsarServiceUrl: pulsar://broker.pulsar:6650/ # or pulsar+ssl://pulsar-prod-broker.pulsar:6651/ when using TLS
pulsarWebServiceUrl: http://broker.pulsar:8080/ # or https://pulsar-prod-broker.pulsar:8443/ when using TLS
useTls: true # when using TLS, critical!