Configure Functions runtime
You can use the following methods to run functions.
- Thread: Invoke functions threads in functions worker.
- Process: Invoke functions in processes forked by functions worker.
- Kubernetes: Submit functions as Kubernetes StatefulSets by functions worker.
Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
The differences of the thread and process modes are:
- Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with functions worker.
- Process mode: when a function runs in process mode, it runs on the same machine that functions worker runs.
Configure thread runtime
It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
functionRuntimeFactoryConfigs:
threadGroupName: "Your Function Container Group"
Thread runtime is only supported in Java function.
Configure process runtime
When you enable Process runtime, you do not need to configure anything.
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
functionRuntimeFactoryConfigs:
# the directory for storing the function logs
logDirectory:
# change the jar location only when you put the java instance jar in a different location
javaInstanceJarLocation:
# change the python instance location only when you put the python instance jar in a different location
pythonInstanceLocation:
# change the extra dependencies location:
extraFunctionDependenciesDir:
Process runtime is supported in Java, Python, and Go functions.
Configure Kubernetes runtime
When the functions worker generates Kubernetes manifests and apply the manifests, the Kubernetes runtime works. If you have run functions worker on Kubernetes, you can use the serviceAccount
associated with the pod that the functions worker is running in. Otherwise, you can configure it to communicate with a Kubernetes cluster.
The manifests, generated by the functions worker, include a StatefulSet
, a Service
(used to communicate with the pods), and a Secret
for auth credentials (when applicable). The StatefulSet
manifest (by default) has a single pod, with the number of replicas determined by the "parallelism" of the function. On pod boot, the pod downloads the function payload (via the functions worker REST API). The pod's container image is configurable, but must have the functions runtime.
The Kubernetes runtime supports secrets, so you can create a Kubernetes secret and expose it as an environment variable in the pod. The Kubernetes runtime is extensible, you can implement classes and customize the way how to generate Kubernetes manifests, how to pass auth data to pods, and how to integrate secrets.
For the rules of translating Pulsar object names into Kubernetes resource labels, see here.
Basic configuration
It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory
in the functions_worker.yaml
file. The following is an example.
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
functionRuntimeFactoryConfigs:
# uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in function worker
k8Uri:
# the kubernetes namespace to run the function instances. it is `default`, if this setting is left to be empty
jobNamespace:
# the docker image to run function instance. by default it is `apachepulsar/pulsar`
pulsarDockerImageName:
# the docker image to run function instance according to different configurations provided by users.
# By default it is `apachepulsar/pulsar`.
# e.g:
# functionDockerImages:
# JAVA: JAVA_IMAGE_NAME
# PYTHON: PYTHON_IMAGE_NAME
# GO: GO_IMAGE_NAME
functionDockerImages:
# the root directory of pulsar home directory in `pulsarDockerImageName`. by default it is `/pulsar`.
# if you are using your own built image in `pulsarDockerImageName`, you need to set this setting accordingly
pulsarRootDir:
# this setting only takes effects if `k8Uri` is set to null. if your function worker is running as a k8 pod,
# setting this to true is let function worker to submit functions to the same k8s cluster as function worker
# is running. setting this to false if your function worker is not running as a k8 pod.
submittingInsidePod: false
# setting the pulsar service url that pulsar function should use to connect to pulsar
# if it is not set, it will use the pulsar service url configured in worker service
pulsarServiceUrl:
# setting the pulsar admin url that pulsar function should use to connect to pulsar
# if it is not set, it will use the pulsar admin url configured in worker service
pulsarAdminUrl:
# the custom labels that function worker uses to select the nodes for pods
customLabels:
# the directory for dropping extra function dependencies
# if it is not an absolute path, it is relative to `pulsarRootDir`
extraFunctionDependenciesDir:
# Additional memory padding added on top of the memory requested by the function per on a per instance basis
percentMemoryPadding: 10
If you run functions worker embedded in a broker on Kubernetes, you can use the default settings.
Run standalone functions worker on Kubernetes
If you run functions worker standalone (that is, not embedded) on Kubernetes, you need to configure pulsarSerivceUrl
to be the URL of the broker and pulsarAdminUrl
as the URL to the functions worker.
For example, both Pulsar brokers and Function Workers run in the pulsar
K8S namespace. The brokers have a service called brokers
and the functions worker has a service called func-worker
. The settings are as follows:
pulsarServiceUrl: pulsar://broker.pulsar:6650 // or pulsar+ssl://broker.pulsar:6651 if using TLS
pulsarAdminUrl: http://func-worker.pulsar:8080 // or https://func-worker:8443 if using TLS
Run RBAC in Kubernetes clusters
If you run RBAC in your Kubernetes cluster, make sure that the service account you use for running functions workers (or brokers, if functions workers run along with brokers) have permissions on the following Kubernetes APIs.
- services
- configmaps
- pods
- apps.statefulsets
The following is sufficient:
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: functions-worker
rules:
- apiGroups: [""]
resources:
- services
- configmaps
- pods
verbs:
- '*'
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- '*'
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: functions-worker
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: functions-worker
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: functions-worker
subjectsKubernetesSec:
- kind: ServiceAccount
name: functions-worker
If the service-account is not properly configured, an error message similar to this is displayed:
22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
io.kubernetes.client.ApiException: Forbidden
at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_212]
Integrate Kubernetes secrets
In order to safely distribute secrets, Pulsar Functions can reference Kubernetes secrets. To enable this, set the secretsProviderConfiguratorClassName
to org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator
.
You can create a secret in the namespace where your functions are deployed. For example, you deploy functions to the pulsar-func
Kubernetes namespace, and you have a secret named database-creds
with a field name password
, which you want to mount in the pod as an environment variable called DATABASE_PASSWORD
. The following functions configuration enables you to reference that secret and mount the value as an environment variable in the pod.
tenant: "mytenant"
namespace: "mynamespace"
name: "myfunction"
topicName: "persistent://mytenant/mynamespace/myfuncinput"
className: "com.company.pulsar.myfunction"
secrets:
# the secret will be mounted from the `password` field in the `database-creds` secret as an env var called `DATABASE_PASSWORD`
DATABASE_PASSWORD:
path: "database-creds"
key: "password"
Enable token authentication
When you enable authentication for your Pulsar cluster, you need a mechanism for the pod running your function to authenticate with the broker.
The org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider
interface provides support for any authentication mechanism. The functionAuthProviderClassName
in function-worker.yml
is used to specify your path to this implementation.
Pulsar includes an implementation of this interface for token authentication, and distributes the certificate authority via the same implementation. The configuration is similar as follows:
functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider
For token authentication, the functions worker captures the token that is used to deploy (or update) the function. The token is saved as a secret and mounted into the pod.
For custom authentication or TLS, you need to implement this interface or use an alternative mechanism to provide authentication. If you use token authentication and TLS encryption to secure the communication with the cluster, Pulsar passes your certificate authority (CA) to the client, so the client obtains what it needs to authenticate the cluster, and trusts the cluster with your signed certificate.
If you use tokens that expire when deploying functions, these tokens will expire.
Run clusters with authentication
When you run a functions worker in a standalone process (that is, not embedded in the broker) in a cluster with authentication, you must configure your functions worker to interact with the broker and authenticate incoming requests. So you need to configure properties that the broker requires for authentication or authorization.
For example, if you use token authentication, you need to configure the following properties in the function-worker.yml
file.
clientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken
clientAuthenticationParameters: file:///etc/pulsar/token/admin-token.txt
configurationStoreServers: zookeeper-cluster:2181 # auth requires a connection to zookeeper
authenticationProviders:
- "org.apache.pulsar.broker.authentication.AuthenticationProviderToken"
authorizationEnabled: true
authenticationEnabled: true
superUserRoles:
- superuser
- proxy
properties:
tokenSecretKey: file:///etc/pulsar/jwt/secret # if using a secret token
tokenPublicKey: file:///etc/pulsar/jwt/public.key # if using public/private key tokens
You must configure both the Function Worker authorization or authentication for the server to authenticate requests and configure the client to be authenticated to communicate with the broker.
Customize Kubernetes runtime
The Kubernetes integration enables you to implement a class and customize how to generate manifests. You can configure it by setting runtimeCustomizerClassName
in the functions-worker.yml
file and use the fully qualified class name. You must implement the org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer
interface.
The functions (and sinks/sources) API provides a flag, customRuntimeOptions
, which is passed to this interface.
To initialize the KubernetesManifestCustomizer
, you can provide runtimeCustomizerConfig
in the functions-worker.yml
file. runtimeCustomizerConfig
is passed to the public void initialize(Map<String, Object> config)
function of the interface. runtimeCustomizerConfig
is different from the customRuntimeOptions
as runtimeCustomizerConfig
is the same across all functions. If you provide both runtimeCustomizerConfig
and customRuntimeOptions
, you need to decide how to manage these two configurations in your implementation of KubernetesManifestCustomizer
.
Pulsar includes a built-in implementation. To use the basic implementation, set runtimeCustomizerClassName
to org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer
. The built-in implementation initialized with runtimeCustomizerConfig
enables you to pass a JSON document as customRuntimeOptions
with certain properties to augment, which decides how the manifests are generated. If both runtimeCustomizerConfig
and customRuntimeOptions
are provided, BasicKubernetesManifestCustomizer
uses customRuntimeOptions
to override the configuration if there are conflicts in these two configurations.
Below is an example of customRuntimeOptions
.
{
"jobName": "jobname", // the k8s pod name to run this function instance
"jobNamespace": "namespace", // the k8s namespace to run this function in
"extractLabels": { // extra labels to attach to the statefulSet, service, and pods
"extraLabel": "value"
},
"extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
"extraAnnotation": "value"
},
"nodeSelectorLabels": { // node selector labels to add on to the pod spec
"customLabel": "value"
},
"tolerations": [ // tolerations to add to the pod spec
{
"key": "custom-key",
"value": "value",
"effect": "NoSchedule"
}
],
"resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
"requests": {
"cpu": 1,
"memory": "4G"
},
"limits": {
"cpu": 2,
"memory": "8G"
}
}
}
Run clusters with geo-replication
If you run multiple clusters tied together with geo-replication, it is important to use a different function namespace for each cluster. Otherwise, the function shares a namespace and potentially schedule across clusters.
For example, if you have two clusters: east-1
and west-1
, you can configure the functions workers for east-1
and west-1
perspectively as follows.
pulsarFunctionsCluster: east-1
pulsarFunctionsNamespace: public/functions-east-1
pulsarFunctionsCluster: west-1
pulsarFunctionsNamespace: public/functions-west-1
This ensures the two different Functions Workers use distinct sets of topics for their internal coordination.
Configure standalone functions worker
When configuring a standalone functions worker, you need to configure properties that the broker requires, especially if you use TLS. And then Functions Worker can communicate with the broker.
You need to configure the following required properties.
workerPort: 8080
workerPortTls: 8443 # when using TLS
tlsCertificateFilePath: /etc/pulsar/tls/tls.crt # when using TLS
tlsKeyFilePath: /etc/pulsar/tls/tls.key # when using TLS
tlsTrustCertsFilePath: /etc/pulsar/tls/ca.crt # when using TLS
pulsarServiceUrl: pulsar://broker.pulsar:6650/ # or pulsar+ssl://pulsar-prod-broker.pulsar:6651/ when using TLS
pulsarWebServiceUrl: http://broker.pulsar:8080/ # or https://pulsar-prod-broker.pulsar:8443/ when using TLS
useTls: true # when using TLS, critical!