Pulsar Functions State Storage (Developer Preview)
Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper table service
for storing the State
for functions. For example, A WordCount
function can store its counters
state into BookKeeper's table service via Pulsar Functions State API.
API
Java API
Currently Pulsar Functions expose following APIs for mutating and accessing State. These APIs are available in the Context object when you are using Java SDK functions.
incrCounter
/**
* Increment the builtin distributed counter referred by key
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);
The application can use incrCounter
to change the counter of a given key
by the given amount
.
incrCounterAsync
/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
CompletableFuture<Void> incrCounterAsync(String key, long amount);
The application can use incrCounterAsync
to asynchronously change the counter of a given key
by the given amount
.
getCounter
/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);
The application can use getCounter
to retrieve the counter of a given key
mutated by incrCounter
.
Besides the counter
API, Pulsar also exposes a general key/value API for functions to store
general key/value state.
getCounterAsync
/**
* Retrieve the counter value for the key, but don't wait
* for the operation to be completed
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
CompletableFuture<Long> getCounterAsync(String key);
The application can use getCounterAsync
to asynchronously retrieve the counter of a given key
mutated by incrCounterAsync
.
putState
/**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);
putStateAsync
/**
* Update the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @param value state value of the key
*/
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
The application can use putStateAsync
to asynchronously update the state of a given key
.
getState
/**
* Retrieve the state value for the key.
*
* @param key name of the key
* @return the state value for the key.
*/
ByteBuffer getState(String key);
getStateAsync
/**
* Retrieve the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @return the state value for the key.
*/
CompletableFuture<ByteBuffer> getStateAsync(String key);
The application can use getStateAsync
to asynchronously retrieve the state of a given key
.
Python API
State currently is not supported at Python SDK.
Query State
A Pulsar Function can use the State API for storing state into Pulsar's state storage and retrieving state back from Pulsar's state storage. Additionally Pulsar also provides CLI commands for querying its state.
$ bin/pulsar-admin functions querystate \
--tenant <tenant> \
--namespace <namespace> \
--name <function-name> \
--state-storage-url <bookkeeper-service-url> \
--key <state-key> \
[---watch]
If --watch
is specified, the CLI will watch the value of the provided state-key
.
Example
Java Example
WordCountFunction is a very good example
demonstrating on how Application can easily store state
in Pulsar Functions.
public class WordCountFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));
return null;
}
}
The logic of this WordCount
function is pretty simple and straightforward:
- The function first splits the received
String
into multiple words using regex\\.
. - For each
word
, the function increments the correspondingcounter
by 1 (viaincrCounter(key, amount)
).
Python Example
State currently is not supported at Python SDK.