Pulsar Go client
You can use a Pulsar Go client to create Pulsar producers, consumers, and readers in Golang. For Pulsar features that Go clients support, see Client Feature Matrix.
Installation
You can install the pulsar
library by using either go get
or go module
.
Use go get
-
Download the library of Go client to your local environment:
go get -u "github.com/apache/pulsar-client-go/pulsar"
-
Import it into your project:
import "github.com/apache/pulsar-client-go/pulsar"
Use go module
-
Create a directory named
test_dir
and change your working directory to it.mkdir test_dir && cd test_dir
-
Write a sample script (such as
test_example.go
) in thetest_dir
directory and writepackage main
at the beginning of the file.go mod init test_dir
go mod tidy && go mod download
go build test_example.go
./test_example
Connection URLs
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the pulsar
scheme. The following is an example of localhost
with the default port 6650
:
pulsar://localhost:6650
If you have multiple brokers, separate IP:port
by commas:
pulsar://localhost:6550,localhost:6651,localhost:6652
If you use mTLS authentication, add +ssl
in the scheme:
pulsar+ssl://pulsar.us-west.example.com:6651
API reference
API docs are available on the Godoc page.
Release notes
For the changelog of Pulsar Go clients, see release notes.
Create a client
To interact with Pulsar, you need a Client
object first. You can create a client object using the NewClient
function, passing in a ClientOptions
object. Here's an example:
import (
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer client.Close()
}
If you have multiple brokers, you can initiate a client object as below.
import (
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650,localhost:6651,localhost:6652",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer client.Close()
}
All configurable parameters for ClientOptions
are here.
Producers
Pulsar producers publish messages to Pulsar topics. You can configure Go producers using a ProducerOptions
object. Here's an example:
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
if err != nil {
log.Fatal(err)
}
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Producer operations
All available methods of Producer
interface are here.
Producer Example
How to use message router in producer
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-partitioned-topic",
MessageRouter: func(msg *pulsar.ProducerMessage, tm pulsar.TopicMetadata) int {
fmt.Println("Topic has", tm.NumPartitions(), "partitions. Routing message ", msg, " to partition 2.")
// always push msg to partition 2
return 2
},
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
for i := 0; i < 10; i++ {
if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("message-%d", i)),
}); err != nil {
log.Fatal(err)
} else {
log.Println("Published message: ", msgId)
}
}
// subscribe a specific partition of a topic
// for demos only, not recommend to subscribe a specific partition
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
// pulsar partition is a special topic has the suffix '-partition-xx'
Topic: "my-partitioned-topic-partition-2",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
How to use chunking in producer
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: serviceURL,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// The message chunking feature is OFF by default.
// By default, a producer chunks the large message based on the max message size (`maxMessageSize`) configured at the broker side (for example, 5MB).
// Client can also configure the max chunked size using the producer configuration `ChunkMaxMessageSize`.
// Note: to enable chunking, you need to disable batching (`DisableBatching=true`) concurrently.
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
DisableBatching: true,
EnableChunking: true,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
How to use schema interface in producer
type testJSON struct {
ID int `json:"id"`
Name string `json:"name"`
}
var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
properties := make(map[string]string)
properties["pulsar"] = "hello"
jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef, properties)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "jsonTopic",
Schema: jsonSchemaWithProperties,
})
if err != nil {
log.Fatal(err)
}
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: &testJSON{
ID: 100,
Name: "pulsar",
},
})
if err != nil {
log.Fatal(err)
}
producer.Close()
How to use delay relative in producer
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topicName := "topic-1"
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topicName,
SubscriptionName: "subName",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
ID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("test")),
DeliverAfter: 3 * time.Second,
})
if err != nil {
log.Fatal(err)
}
fmt.Println(ID)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
cancel()
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
msg, err = consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
cancel()
How to use Prometheus metrics in producer
Pulsar Go client registers client metrics using Prometheus. This section demonstrates how to create a simple Pulsar producer application that exposes Prometheus metrics via HTTP.
- Write a simple producer application.
// Create a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Start a separate goroutine for Prometheus metrics
// In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
go func() {
prometheusPort := 2112
log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
http.Handle("/metrics", promhttp.Handler())
err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
if err != nil {
log.Fatal(err)
}
}()
// Create a producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "topic-1",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
ctx := context.Background()
// Write your business logic here
// In this case, you build a simple Web server. You can produce messages by requesting http://localhost:8082/produce
webPort := 8082
http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) {
msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello world")),
})
if err != nil {
log.Fatal(err)
} else {
log.Printf("Published message: %v", msgId)
fmt.Fprintf(w, "Published message: %v", msgId)
}
})
err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
if err != nil {
log.Fatal(err)
}
- To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (
prometheus.yml
).
scrape_configs:
- job_name: pulsar-client-go-metrics
scrape_interval: 10s
static_configs:
- targets:
- localhost:2112
Now you can query Pulsar client metrics on Prometheus.
Producer configuration
All available options of ProducerOptions
are here.
Consumers
Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions
object. Here's a basic example that uses channels:
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for i := 0; i < 10; i++ {
// may block here
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
Consumer operations
All available methods of Consumer
interface are here.
Create single-topic consumer
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
// fill `Topic` field will create a single-topic consumer
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
Create regex-topic consumer
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
topicsPattern := "persistent://public/default/topic.*"
opts := pulsar.ConsumerOptions{
// fill `TopicsPattern` field will create a regex consumer
TopicsPattern: topicsPattern,
SubscriptionName: "regex-sub",
}
consumer, err := client.Subscribe(opts)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
Create multi-topic consumer
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
topics := []string{"topic-1", "topic-2"}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
// fill `Topics` field will create a multi-topic consumer
Topics: topics,
SubscriptionName: "multi-topic-sub",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
Create consumer listener
import (
"fmt"
"log"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// we can listen this channel
channel := make(chan pulsar.ConsumerMessage, 100)
options := pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
// fill `MessageChannel` field will create a listener
MessageChannel: channel,
}
consumer, err := client.Subscribe(options)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// Receive messages from channel. The channel returns a struct `ConsumerMessage` which contains message and the consumer from where
// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
// shared across multiple consumers as well
for cm := range channel {
consumer := cm.Consumer
msg := cm.Message
fmt.Printf("Consumer %s received a message, msgId: %v, content: '%s'\n",
consumer.Name(), msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
}
Receive message with timeout
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topic := "test-topic-with-no-messages"
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// create consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub1",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// receive message with a timeout
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
Use schema in consumer
type testJSON struct {
ID int `json:"id"`
Name string `json:"name"`
}
var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
var s testJSON
consumerJS := pulsar.NewJSONSchema(exampleSchemaDef, nil)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "jsonTopic",
SubscriptionName: "sub-1",
Schema: consumerJS,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
})
if err != nil {
log.Fatal(err)
}
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
err = msg.GetSchemaValue(&s)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
How to use Prometheus metrics in consumer
In this guide, This section demonstrates how to create a simple Pulsar consumer application that exposes Prometheus metrics via HTTP.
- Write a simple consumer application.
// Create a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Start a separate goroutine for Prometheus metrics
// In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
go func() {
prometheusPort := 2112
log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
http.Handle("/metrics", promhttp.Handler())
err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
if err != nil {
log.Fatal(err)
}
}()
// Create a consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "sub-1",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
ctx := context.Background()
// Write your business logic here
// In this case, you build a simple Web server. You can consume messages by requesting http://localhost:8083/consume
webPort := 8083
http.HandleFunc("/consume", func(w http.ResponseWriter, r *http.Request) {
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
} else {
log.Printf("Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
fmt.Fprintf(w, "Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
})
err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
if err != nil {
log.Fatal(err)
}
- To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (
prometheus.yml
).
scrape_configs:
- job_name: pulsar-client-go-metrics
scrape_interval: 10s
static_configs:
- targets:
- localhost: 2112
Now you can query Pulsar client metrics on Prometheus.
Consumer configuration
All available options of ConsumerOptions
are here.
Readers
Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recent unacked message). You can configure Go readers using a ReaderOptions
object. Here's an example:
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
Reader operations
All available methods of the Reader
interface are here.
Reader example
How to use reader to read 'next' message
Here's an example usage of a Go reader that uses the Next()
method to process incoming messages:
import (
"context"
"fmt"
"log"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
}
In the example above, the reader begins reading from the earliest available message (specified by pulsar.EarliestMessage
). The reader can also begin reading from the latest message (pulsar.LatestMessage
) or some other message ID specified by bytes using the DeserializeMessageID
function, which takes a byte array and returns a MessageID
object. Here's an example:
lastSavedId := // Read last saved message id from external store as byte[]
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "my-golang-topic",
StartMessageID: pulsar.DeserializeMessageID(lastSavedId),
})
Use reader to read specific message
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topic := "topic-1"
ctx := context.Background()
// create producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// send 10 messages
msgIDs := [10]pulsar.MessageID{}
for i := 0; i < 10; i++ {
msgID, _ := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
msgIDs[i] = msgID
}
// create reader on 5th message (not included)
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageID: msgIDs[4],
StartMessageIDInclusive: false,
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
// receive the remaining 5 messages
for i := 5; i < 10; i++ {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Read %d-th msg: %s\n", i, string(msg.Payload()))
}
// create reader on 5th message (included)
readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageID: msgIDs[4],
StartMessageIDInclusive: true,
})
if err != nil {
log.Fatal(err)
}
defer readerInclusive.Close()
Reader configuration
All available options of ReaderOptions
are here.
Messages
The Pulsar Go client provides a ProducerMessage
interface that you can use to construct messages to producers on Pulsar topics. Here's an example message:
msg := pulsar.ProducerMessage{
Payload: []byte("Here is some message data"),
Key: "message-key",
Properties: map[string]string{
"foo": "bar",
},
EventTime: time.Now(),
ReplicationClusters: []string{"cluster1", "cluster3"},
}
if _, err := producer.send(msg); err != nil {
log.Fatalf("Could not publish message due to: %v", err)
}
All methods of ProducerMessage
object are here.
TLS encryption and authentication
To use TLS encryption and mTLS authentication, you need to configure your client to do so:
- Use
pulsar+ssl
URL type - Set
TLSTrustCertsFilePath
to the path of the TLS certs used by your client and the Pulsar broker - Configure
Authentication
option
Here's an example:
opts := pulsar.ClientOptions{
URL: "pulsar+ssl://my-cluster.com:6651",
TLSTrustCertsFilePath: "/path/to/certs/my-cert.csr",
Authentication: pulsar.NewAuthenticationTLS("my-cert.pem", "my-key.pem"),
}
OAuth2 authentication
To use OAuth2 authentication, you need to configure your client to perform the following operations.
This example shows how to configure OAuth2 authentication.
oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
"type": "client_credentials",
"issuerUrl": "https://dev-kt-aa9ne.us.auth0.com",
"audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/",
"privateKey": "/path/to/privateKey",
"clientId": "0Xx...Yyxeny",
})
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://my-cluster:6650",
Authentication: oauth,
})