Configure Kubernetes runtime
The Kubernetes runtime works when a function worker generates and applies Kubernetes manifests. The manifests generated by a function worker include:
- a
StatefulSet
By default, theStatefulSet
manifest has a single pod with a number of replicas. The number is determined by the parallelism of the function. The pod downloads the function payload (via the function worker REST API) on pod boot. The pod's container image is configurable if the function runtime is configured. - a
Service
(used to communicate with the pod) - a
Secret
for authenticating credentials (when applicable). The Kubernetes runtime supports secrets. You can create a Kubernetes secret and expose it as an environment variable in the pod.
For the rules of translating Pulsar object names into Kubernetes resource labels, see instructions.
Configure basic settings
To quickly configure a Kubernetes runtime, you can use the default settings of KubernetesRuntimeFactoryConfig
in the conf/functions_worker.yml
file.
If you have [set up a Pulsar cluster on Kubernetes using Helm chart, which means function workers have also been set up on Kubernetes, you can use the serviceAccount
associated with the pod where the function worker is running. Otherwise, you can configure function workers to communicate with a Kubernetes cluster by setting functionRuntimeFactoryConfigs
to k8Uri
.
Integrate Kubernetes secrets
A Secret in Kubernetes is an object that holds some confidential data such as a password, a token, or a key. When you create a secret in the Kubernetes namespace where your functions are deployed, functions can safely reference and distribute it. To enable this feature, set secretsProviderConfiguratorClassName
to org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator
in the conf/functions-worker.yml
file.
For example, you deploy a function to the pulsar-func
Kubernetes namespace, and you have a secret named database-creds
with a field name password
, which you want to mount in the pod as an environment variable named DATABASE_PASSWORD
. The following configurations enable functions to reference the secret and mount the value as an environment variable in the pod.
tenant: "mytenant"
namespace: "mynamespace"
name: "myfunction"
inputs: [ "persistent://mytenant/mynamespace/myfuncinput" ]
className: "com.company.pulsar.myfunction"
secrets:
# the secret will be mounted from the `password` field in the `database-creds` secret as an env var called `DATABASE_PASSWORD`
DATABASE_PASSWORD:
path: "database-creds"
key: "password"
Enable token authentication
When you use token authentication, TLS encryption, or custom authentications to secure the communication with your Pulsar cluster, Pulsar passes your certificate authority (CA) to the client, so the client can authenticate the cluster with your signed certificate.
To enable the token authentication for your Pulsar cluster, you need to specify a mechanism for the pod running your function to authenticate the broker, by implementing the org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider
interface.
-
For token authentication, Pulsar includes an implementation of the above interface to distribute the CA. The function worker captures the token that deploys (or updates) the function, saves it as a secret, and mounts it into the pod.
The configuration in the
conf/function-worker.yml
file is as follows.functionAuthProviderClassName
is used to specify the path to this implementation.functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider
-
For TLS or custom authentication, you can either implement the
org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider
interface or use an alternative mechanism.
If the token you use to deploy the function has an expiration date, you may need to deploy the function again after it expires.
Enable Kubernetes service account token projection for function pod authentication
The KubernetesServiceAccountTokenAuthProvider
uses service account token volume projections to mount a token into the function's pod. The function worker and broker can verify this token using OpenID Connect. The primary benefit of this integration is that tokens have a short time to live, are managed by Kubernetes, and do not inherit the permission used to create the function.
This feature requires that the broker and the function worker are configured to use the AuthenticationProviderOpenID
. Documentation to enable this provider can be found here.
Here is an example configuration for the function worker to utilize this feature:
functionAuthProviderClassName: "org.apache.pulsar.functions.auth.KubernetesServiceAccountTokenAuthProvider"
kubernetesContainerFactory:
kubernetesFunctionAuthProviderConfig:
# Required
serviceAccountTokenExpirationSeconds: "600"
serviceAccountTokenAudience: "the-required-audience"
# Optional
brokerClientTrustCertsSecretName: "my-secret-pulsar-broker-client-trust-certs"
The function pod deploys with the default Kubernetes service account for the target namespace. Because the service account name maps to the sub
claim on the JWT projected into the pod's filesystem, all pods with the same service account will have the same permission within Pulsar. There is ongoing work to improve this integration.
Here is a sample JWT generated by this feature running in EKS (with some information redacted):
{
"aud": [
"your-audience"
],
"exp": 1710969822,
"iat": 1679433822,
"iss": "https://oidc.eks.us-east-2.amazonaws.com/id/some-id",
"kubernetes.io": {
"namespace": "pulsar-function",
"pod": {
"name": "function-pod-0",
"uid": "fbac8f9e-a47d-4ad7-a8f0-cc9a65d1331c"
},
"serviceaccount": {
"name": "default",
"uid": "5964f9d3-3dce-467c-8dbe-d0f463063d7a"
},
"warnafter": 1679437429
},
"nbf": 1679433822,
"sub": "system:serviceaccount:pulsar-function:default"
}
To grant permission to this function pod, you need to grant permissions to the role claim, which is the sub
claim by default, system:serviceaccount:pulsar-function:default
.
Customize Kubernetes runtime
Customizing Kubernetes runtime allows you to customize Kubernetes resources created by the runtime, including how to generate manifests, how to pass authenticated data to pods, and how to integrate secrets.
To customize Kubernetes runtime, you can set runtimeCustomizerClassName
in the conf/functions-worker.yml
file and use the fully qualified class name.
The function API provides a flag named customRuntimeOptions
, which is passed to the org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer
interface. To initialize KubernetesManifestCustomizer
, you can set runtimeCustomizerConfig
in the conf/functions-worker.yml
file.
runtimeCustomizerConfig
is the same across all functions. If you provide both runtimeCustomizerConfig
and customRuntimeOptions
, you need to decide how to manage these two configurations in your implementation of the KubernetesManifestCustomizer
interface.
Pulsar includes a built-in implementation initialized with runtimeCustomizerConfig
. It enables you to pass a JSON document as customRuntimeOptions
with certain properties to augment. To use this built-in implementation, set runtimeCustomizerClassName
to org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer
.
If both runtimeCustomizerConfig
and customRuntimeOptions
are provided and have conflicts, BasicKubernetesManifestCustomizer
uses customRuntimeOptions
to override runtimeCustomizerConfig
.
Below is an example of configuring customRuntimeOptions
.
{
"jobName": "jobname", // the k8s pod name to run this function instance
"jobNamespace": "namespace", // the k8s namespace to run this function in
"extractLabels": { // extra labels to attach to the statefulSet, service, and pods
"extraLabel": "value"
},
"extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
"extraAnnotation": "value"
},
"nodeSelectorLabels": { // node selector labels to add on to the pod spec
"customLabel": "value"
},
"tolerations": [ // tolerations to add to the pod spec
{
"key": "custom-key",
"value": "value",
"effect": "NoSchedule"
}
],
"resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
"requests": {
"cpu": 1,
"memory": "4G"
},
"limits": {
"cpu": 2,
"memory": "8G"
}
}
}
How to define Pulsar resource names when running Pulsar in Kubernetes
If you run Pulsar Functions or connectors on Kubernetes, you need to follow the Kubernetes naming convention to define the names of your Pulsar resources, whichever admin interface you use.
Kubernetes requires a name that can be used as a DNS subdomain name as defined in RFC 1123. Pulsar supports more legal characters than the Kubernetes naming convention. If you create a Pulsar resource name with special characters that are not supported by Kubernetes (for example, including colons in a Pulsar namespace name), Kubernetes runtime translates the Pulsar object names into Kubernetes resource labels which are in RFC 1123-compliant forms. Consequently, you can run functions or connectors using Kubernetes runtime. The rules for translating Pulsar object names into Kubernetes resource labels are as below:
-
Truncate to 63 characters
-
Replace the following characters with dashes (-):
-
Non-alphanumeric characters
-
Underscores (_)
-
Dots (.)
-
-
Replace beginning and ending non-alphanumeric characters with 0
- If you get an error in translating Pulsar object names into Kubernetes resource labels (for example, you may have a naming collision if your Pulsar object name is too long) or want to customize the translating rules, see customize Kubernetes runtime.
- For how to configure Kubernetes runtime, see instructions.