Skip to main content
Version: Next

Use APIs

The following table outlines the APIs that you can use to develop Pulsar Functions in Java, Python, and Go.

InterfaceDescriptionUse case
Language-native interface for Java/PythonNo Pulsar-specific libraries or special dependencies required (only core libraries).Functions that do not require access to the context.
Pulsar Functions SDK for Java/Python/GoPulsar-specific libraries that provide a range of functionality not available in the language-native interfaces, such as state management or user configuration.Functions that require access to the context.
Extended Pulsar Functions SDK for JavaAn extension to Pulsar-specific libraries, providing the initialization and close interfaces in Java.Functions that require initializing and releasing external resources.

Use language-native interface for Java/Python

The language-native interface provides a simple and clean approach to writing Java/Python functions, by adding an exclamation point to all incoming strings and publishing the output string to a topic. It has no external dependencies.

The following examples are language-native functions.

To use a piece of Java code as a "language-native" function, you need to implement the java.util.Function interface. You can include any sort of complex logic inside the apply method to provide more processing capabilities.

import java.util.function.Function;

public class JavaNativeExclamationFunction implements Function<String, String> {
@Override
public String apply(String input) {
return String.format("%s!", input);
}
}

For more details, see code example.

Use SDK for Java/Python/Go

The implementation of Pulsar Functions SDK specifies a functional interface that includes the context object as a parameter.

The following examples use Pulsar Functions SDK for different languages.

When developing a function using the Java SDK, you need to implement the org.apache.pulsar.functions.api.Function interface. It specifies only one method that you need to implement called process.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class ExclamationFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return String.format("%s!", input);
}
}

For more details, see code example.

The return type of the function can be wrapped in a Record generic which gives you more control over the output messages, such as topics, schemas, properties, and so on. Use the Context::newOutputRecordBuilder method to build this Record output.

import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;

public class RecordFunction implements Function<String, Record<String>> {

@Override
public Record<String> process(String input, Context context) throws Exception {
String output = String.format("%s!", input);
Map<String, String> properties = new HashMap<>(context.getCurrentRecord().getProperties());
context.getCurrentRecord().getTopicName().ifPresent(topic -> properties.put("input_topic", topic));

return context.newOutputRecordBuilder(Schema.STRING)
.value(output)
.properties(properties)
.build();
}
}

For more details, see code example.

Use extended SDK for Java

This extended Pulsar Functions SDK provides two additional interfaces to initialize and release external resources.

  • By using the initialize interface, you can initialize external resources which only need one-time initialization when the function instance starts.
  • By using the close interface, you can close the referenced external resources when the function instance closes.
note

The extended Pulsar Functions SDK for Java is only available in Pulsar 2.10.0 or later versions. Before using it, you need to set up function workers in Pulsar 2.10.0 or later versions.

The following example uses the extended interface of Pulsar Functions SDK for Java to initialize RedisClient when the function instance starts and release it when the function instance closes.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import io.lettuce.core.RedisClient;

public class InitializableFunction implements Function<String, String> {
private RedisClient redisClient;

private void initRedisClient(Map<String, Object> connectInfo) {
redisClient = RedisClient.create(connectInfo.get("redisURI"));
}

@Override
public void initialize(Context context) {
Map<String, Object> connectInfo = context.getUserConfigMap();
redisClient = initRedisClient(connectInfo);
}

@Override
public String process(String input, Context context) {
String value = client.get(key);
return String.format("%s-%s", input, value);
}

@Override
public void close() {
redisClient.close();
}
}