Skip to main content

Validating release candidates

This page contains manual instructions for reviewing and verifying a release candidate.

Validate the binary distribution

note

There's a bash script validate_pulsar_release.sh available for assisting in semi-automated validation for the following steps.

Download And Verify the binary distributions

Download the server distribution apache-pulsar-<release>-bin.tar.gz and extract it. The extracted files are in a directory called apache-pulsar-<release>. All the operations below happen within that directory:

cd apache-pulsar-<release>

Check the bookkeeper libs are complied on Linux:

unzip -t ./lib/org.apache.bookkeeper-circe-checksum-*.jar | grep lib
unzip -t ./lib/org.apache.bookkeeper-cpu-affinity-*.jar | grep lib

The output should look like:

testing: lib/                     OK
testing: lib/libcirce-checksum.so OK
testing: lib/ OK
testing: lib/libcpu-affinity.so OK

Download the Cassandra connector:

mkdir connectors
mv pulsar-io-cassandra-<release>.nar connectors

Download the *.asc file and verify the GPG signature:

gpg --verify apache-pulsar-<release>-bin.tar.gz.asc

Download And Verify the source tarball

Before you start to validate the source tarball, make sure you have installed these software:

  • Amazon Corretto OpenJDK
    • JDK 21 for Pulsar version >= 3.3
      • code will be compiled for Java 17 with Java 21
      • Pulsar docker images are running Java 21 since 3.3.0
    • JDK 17 for Pulsar version >= 2.11
    • JDK 11 for earlier versions
  • Maven 3.9.9 (most recent stable Maven 3.9.x version)
    • Install using sdkman i maven 3.9.9
  • Zip

Please refer to "Setting up JDKs and Maven using SDKMAN" for details on how to install JDKs and Maven using SDKMAN.

Download the source tarball and extract it. The extracted files are in a directory called apache-pulsar-<release>-src

cd apache-pulsar-<release>-src
mvn clean install -DskipTests

Validate Pub/Sub and Java Functions

Standalone service

Open a terminal to start a standalone service:

PULSAR_STANDALONE_USE_ZOOKEEPER=1 bin/pulsar standalone

When you start a standalone cluster, there are a few things to check.

  1. The standalone cluster is able to locate all the connectors. The following logging information should be displayed.
Found connector ConnectorDefinition(name=kinesis, description=Kinesis sink connector, sourceClass=null, sinkClass=org.apache.pulsar.io.kinesis.KinesisSink) from /Users/sijie/tmp/apache-pulsar-2.1.0-incubating/./connectors/pulsar-io-kinesis-2.1.0-incubating.nar
...
Found connector ConnectorDefinition(name=cassandra, description=Writes data into Cassandra, sourceClass=null, sinkClass=org.apache.pulsar.io.cassandra.CassandraStringSink) from /Users/sijie/tmp/apache-pulsar-2.1.0-incubating/./connectors/pulsar-io-cassandra-2.1.0-incubating.nar
...
Found connector ConnectorDefinition(name=aerospike, description=Aerospike database sink, sourceClass=null, sinkClass=org.apache.pulsar.io.aerospike.AerospikeStringSink) from /Users/sijie/tmp/apache-pulsar-2.1.0-incubating/./connectors/pulsar-io-aerospike-2.1.0-incubating.nar
  1. (since Pulsar 2.1 release) The standalone starts bookkeeper table service. The output is similar as follows:
12:12:26.099 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - 'default' namespace for table service : namespace_name: "default"
default_stream_conf {
key_type: HASH
min_num_ranges: 24
initial_num_ranges: 24
split_policy {
fixed_range_policy {
num_ranges: 2
}
}
rolling_policy {
size_policy {
max_segment_size: 134217728
}
}
retention_policy {
time_policy {
retention_minutes: -1
}
}
}
  1. Functions worker is started correctly. The output is similar as follows:
14:28:24.101 [main] INFO  org.apache.pulsar.functions.worker.WorkerService - Starting worker c-standalone-fw-localhost-8080...
14:28:24.907 [main] INFO org.apache.pulsar.functions.worker.WorkerService - Worker Configs: {
"workerId" : "c-standalone-fw-localhost-8080",
"workerHostname" : "localhost",
"workerPort" : 8080,
"workerPortTls" : 6751,
"jvmGCMetricsLoggerClassName" : null,
"numHttpServerThreads" : 8,
"connectorsDirectory" : "./connectors",
"functionMetadataTopicName" : "metadata",
"functionWebServiceUrl" : null,
"pulsarServiceUrl" : "pulsar://127.0.0.1:6650",
"pulsarWebServiceUrl" : "http://127.0.0.1:8080",
"clusterCoordinationTopicName" : "coordinate",
"pulsarFunctionsNamespace" : "public/functions",
"pulsarFunctionsCluster" : "standalone",
"numFunctionPackageReplicas" : 1,
"downloadDirectory" : "/tmp/pulsar_functions",
"stateStorageServiceUrl" : "bk://127.0.0.1:4181",
"functionAssignmentTopicName" : "assignments",
"schedulerClassName" : "org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler",
"failureCheckFreqMs" : 30000,
"rescheduleTimeoutMs" : 60000,
"initialBrokerReconnectMaxRetries" : 60,
"assignmentWriteMaxRetries" : 60,
"instanceLivenessCheckFreqMs" : 30000,
"clientAuthenticationPlugin" : null,
"clientAuthenticationParameters" : null,
"topicCompactionFrequencySec" : 1800,
"tlsEnabled" : true,
"tlsCertificateFilePath" : null,
"tlsKeyFilePath" : null,
"tlsTrustCertsFilePath" : null,
"tlsAllowInsecureConnection" : false,
"tlsRequireTrustedClientCertOnConnect" : false,
"useTls" : false,
"tlsHostnameVerificationEnable" : false,
"authenticationEnabled" : false,
"authenticationProviders" : null,
"authorizationEnabled" : false,
"superUserRoles" : null,
"properties" : { },
"threadContainerFactory" : null,
"processContainerFactory" : {
"javaInstanceJarLocation" : null,
"pythonInstanceLocation" : null,
"logDirectory" : null,
"extraFunctionDependenciesDir" : null
},
"kubernetesContainerFactory" : null,
"secretsProviderConfiguratorClassName" : null,
"secretsProviderConfiguratorConfig" : null,
"functionInstanceMinResources" : null,
"workerWebAddress" : "http://localhost:8080",
"functionMetadataTopic" : "persistent://public/functions/metadata",
"clusterCoordinationTopic" : "persistent://public/functions/coordinate",
"functionAssignmentTopic" : "persistent://public/functions/assignments"
}
  1. Do sanity check before moving to the next step.
# check pulsar binary port is listened correctly
netstat -an | grep 6650 | grep LISTEN

# check function cluster
curl -s http://localhost:8080/admin/v2/worker/cluster
# example output:
# [{"workerId":"c-standalone-fw-localhost-6750","workerHostname":"localhost","port":6750}]

# check brokers
curl -s http://localhost:8080/admin/v2/namespaces/public
# example outoupt:
# ["public/default","public/functions"]

# check connectors
curl -s http://localhost:8080/admin/v2/functions/connectors
# example output:
# [{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]

# check table services (Skip this step on 2.11.x or later)
nc -vz4 localhost 4181

Functions

Open another terminal to submit a Java Exclamation function.

  1. Create tenant and namespace:
bin/pulsar-admin tenants create test
bin/pulsar-admin namespaces create test/test-namespace
  1. Create function.
bin/pulsar-admin functions create --function-config-file examples/example-function-config.yaml --jar examples/api-examples.jar

The following information is returned: Created Successfully.

  1. At the same terminal as step 2, retrieve the function configuration.
bin/pulsar-admin functions get --tenant test --namespace test-namespace --name example

The output is similar as follows:

{
"tenant": "test",
"namespace": "test-namespace",
"name": "example",
"className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"userConfig": "{\"PublishTopic\":\"test_result\"}",
"autoAck": true,
"parallelism": 1,
"source": {
"topicsToSerDeClassName": {
"test_src": ""
},
"typeClassName": "java.lang.String"
},
"sink": {
"topic": "test_result",
"typeClassName": "java.lang.String"
},
"resources": {}
}
  1. At the same terminal as step 3, retrieve the function status.
bin/pulsar-admin functions status --tenant test --namespace test-namespace --name example

The output is similar as follows:

{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
}
  1. At the same terminal as step 4, subscribe the output topic test_result.
bin/pulsar-client consume -s test-sub -n 0 test_result
  1. Open a new terminal to produce messages into the input topic test_src.
bin/pulsar-client produce -m "test-messages-`date`" -n 10 test_src
  1. At the terminal of step 5, the messages produced by the Exclamation function is returned. The output is similar as follows:
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2018!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2018!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2018!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2018!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2018!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2018!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2018!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2018!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2018!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2018!

Validate Connectors

note

Make sure you have docker available at your laptop. If you haven't installed docker, you can skip this section.

  1. Set up a cassandra cluster.
docker run -d --rm  --name=cassandra -p 9042:9042 cassandra:3.11

Make sure that the cassandra cluster is running.

# run docker ps to find the docker process for cassandra
docker ps
# check if the cassandra is running as expected
docker logs cassandra
# check the cluster status
docker exec cassandra nodetool status

# Datacenter: datacenter1
# =======================
# Status=Up/Down
# |/ State=Normal/Leaving/Joining/Moving
# -- Address Load Tokens Owns (effective) Host ID Rack
# UN 172.17.0.2 103.67 KiB 256 100.0% af0e4b2f-84e0-4f0b-bb14-bd5f9070ff26 rack1
  1. Create keyspace and table.

Run cqlsh:

docker exec -ti cassandra cqlsh localhost

In the cqlsh, create the pulsar_test_keyspace keyspace and the pulsar_test_table table.

cqlsh> CREATE KEYSPACE pulsar_test_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
cqlsh> USE pulsar_test_keyspace;
cqlsh:pulsar_test_keyspace> CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
cqlsh:pulsar_test_keyspace> exit
  1. Prepare a cassandra sink yaml file and put it under examples directory as cassandra-sink.yml.
vim examples/cassandra-sink.yml

The content should be:

configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"
  1. Submit a cassandra sink.
bin/pulsar-admin sink create --tenant public --namespace default --name cassandra-test-sink --sink-type cassandra --sink-config-file examples/cassandra-sink.yml --inputs test_cassandra
# "Created successfully"

Get the sink info:

bin/pulsar-admin sink get --tenant public --namespace default --name cassandra-test-sink

The output is similar as follows:

{
"tenant": "public",
"namespace": "default",
"name": "cassandra-test-sink",
"className": "org.apache.pulsar.io.cassandra.CassandraStringSink",
"inputSpecs": {
"test_cassandra": {
"isRegexPattern": false
}
},
"configs": {
"roots": "localhost:9042",
"keyspace": "pulsar_test_keyspace",
"columnFamily": "pulsar_test_table",
"keyname": "key",
"columnName": "col"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true,
"archive": "builtin://cassandra"
}

Get the running status:

bin/pulsar-admin sink status --tenant public --namespace default --name cassandra-test-sink

The output is similar as follows:

{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReadFromPulsar" : 0,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSinkExceptions" : 0,
"latestSinkExceptions" : [ ],
"numWrittenToSink" : 0,
"lastReceivedTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
}
  1. Produce messages to the source topic.
for i in {0..10}; do bin/pulsar-client produce -m "key-$i" -n 1 test_cassandra; done
  1. Check the sink status, and 11 messages are processed.
bin/pulsar-admin sink status --tenant public --namespace default --name cassandra-test-sink

The output is similar as follows:

{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReadFromPulsar" : 11,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSinkExceptions" : 0,
"latestSinkExceptions" : [ ],
"numWrittenToSink" : 11,
"lastReceivedTime" : 1554833501277,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
}
  1. Check results in cassandra.
docker exec -ti cassandra cqlsh localhost

In the cqlsh session:

cqlsh> use pulsar_test_keyspace;
cqlsh:pulsar_test_keyspace> select * from pulsar_test_table;

key | col
--------+--------
key-5 | key-5
key-0 | key-0
key-9 | key-9
key-2 | key-2
key-1 | key-1
key-3 | key-3
key-6 | key-6
key-7 | key-7
key-4 | key-4
key-8 | key-8
key-10 | key-10

(11 rows)
cqlsh:pulsar_test_keyspace> exit
  1. Delete the sink.
bin/pulsar-admin sink delete --tenant public --namespace default --name cassandra-test-sink
# "Deleted successfully"
  1. Stop the Cassandra container
docker stop cassandra

Validate Stateful Functions

Since Pulsar 2.1 release, Pulsar enables bookkeeper table service for stateful Pulsar functions (as a developer preview).

The following are instructions to validate counter functions.

  1. Create a wordcount function.
bin/pulsar-admin functions create --function-config-file examples/example-function-config.yaml --jar examples/api-examples.jar --name word_count --className org.apache.pulsar.functions.api.examples.WordCountFunction --inputs test_wordcount_src --output test_wordcount_dest
# "Created successfully"
  1. Get function information and status.
bin/pulsar-admin functions get --tenant test --namespace test-namespace --name word_count

The output is similar as follows:

{
"tenant": "test",
"namespace": "test-namespace",
"name": "word_count",
"className": "org.apache.pulsar.functions.api.examples.WordCountFunction",
"inputSpecs": {
"test_wordcount_src": {
"isRegexPattern": false
}
},
"output": "test_wordcount_dest",
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"userConfig": {
"PublishTopic": "test_result"
},
"runtime": "JAVA",
"autoAck": true,
"parallelism": 1,
"resources": {
"cpu": 1.0,
"ram": 1073741824,
"disk": 10737418240
},
"cleanupSubscription": true
}
bin/pulsar-admin functions status --tenant test --namespace test-namespace --name word_count

The output is similar as follows:

{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
}
  1. Query the state table for the function: watching on a key called "hello"
bin/pulsar-admin functions querystate --tenant test --namespace test-namespace --name word_count -k hello -w
# key 'hello' doesn't exist.
# key 'hello' doesn't exist.
# key 'hello' doesn't exist
  1. Produce the messages to source topic test_wordcount_src.

Produce 10 messages "hello" to the test_wordcount_src topic. The value of "hello" is updated to 10.

bin/pulsar-client produce -m "hello" -n 10 test_wordcount_src

Checkout the result in the terminal of step 3.

{
"key": "hello",
"numberValue": 10,
"version": 9
}

Produce another 10 messages "hello". The result is updated to 20.

bin/pulsar-client produce -m "hello" -n 10 test_wordcount_src

The result in the terminal of step 3 is updated to 20.

  "key": "hello",
"numberValue": 20,
"version": 19