Skip to main content
Version: Next

Pulsar Go client

You can use Pulsar Go client to create Pulsar producers, consumers, and readers in Golang.

API docs are available on the Godoc page

Installationโ€‹

Install go packageโ€‹

You can get the pulsar library by using go get or use it with go module.

Download the library of Go client to your local environment:

go get -u "github.com/apache/pulsar-client-go/pulsar"

Once installed locally, you can import it into your project:

import "github.com/apache/pulsar-client-go/pulsar"

Use with go module:

mkdir test_dir && cd test_dir

Write a sample script in the test_dir directory (such as test_example.go) and write package main at the beginning of the file.

go mod init test_dir 
go mod tidy && go mod download
go build test_example.go
./test_example

Connection URLsโ€‹

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

You can assign Pulsar protocol URLs to specific clusters and use the pulsar scheme. The following is an example of localhost with the default port 6650:

pulsar://localhost:6650

If you have multiple brokers, separate IP:port by commas:

pulsar://localhost:6550,localhost:6651,localhost:6652

If you use TLS authentication, add +ssl in the scheme:

pulsar+ssl://pulsar.us-west.example.com:6651

Create a clientโ€‹

To interact with Pulsar, you need a Client object first. You can create a client object using the NewClient function, passing in a ClientOptions object (more on configuration below). Here's an example:

import (
"log"
"time"

"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}

defer client.Close()
}

If you have multiple brokers, you can initiate a client object as below.

import (
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650,localhost:6651,localhost:6652",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}

defer client.Close()
}

All configurable parameters for ClientOptions are here.

Producersโ€‹

Pulsar producers publish messages to Pulsar topics. You can configure Go producers using a ProducerOptions object. Here's an example:

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})

if err != nil {
log.Fatal(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})

defer producer.Close()

if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

Producer operationsโ€‹

All available methods of Producer interface are here.

Producer Exampleโ€‹

How to use message router in producerโ€‹

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})

if err != nil {
log.Fatal(err)
}
defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-partitioned-topic",
MessageRouter: func(msg *pulsar.ProducerMessage, tm pulsar.TopicMetadata) int {
fmt.Println("Topic has", tm.NumPartitions(), "partitions. Routing message ", msg, " to partition 2.")
// always push msg to partition 2
return 2
},
})

if err != nil {
log.Fatal(err)
}
defer producer.Close()

for i := 0; i < 10; i++ {
if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("message-%d", i)),
}); err != nil {
log.Fatal(err)
} else {
log.Println("Published message: ", msgId)
}
}

// subscribe a specific partition of a topic
// for demos only, not recommend to subscribe a specific partition
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
// pulsar partition is a special topic has the suffix '-partition-xx'
Topic: "my-partitioned-topic-partition-2",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}

How to use schema interface in producerโ€‹

type testJSON struct {
ID int `json:"id"`
Name string `json:"name"`
}

var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

properties := make(map[string]string)
properties["pulsar"] = "hello"
jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef, properties)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "jsonTopic",
Schema: jsonSchemaWithProperties,
})

if err != nil {
log.Fatal(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: &testJSON{
ID: 100,
Name: "pulsar",
},
})
if err != nil {
log.Fatal(err)
}
producer.Close()

How to use delay relative in producerโ€‹

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

topicName := "topic-1"
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topicName,
SubscriptionName: "subName",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

ID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("test")),
DeliverAfter: 3 * time.Second,
})
if err != nil {
log.Fatal(err)
}
fmt.Println(ID)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
cancel()

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
msg, err = consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
cancel()

How to use Prometheus metrics in producerโ€‹

Pulsar Go client registers client metrics using Prometheus. This section demonstrates how to create a simple Pulsar producer application that exposes Prometheus metrics via HTTP.

  1. Write a simple producer application.
// Create a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}

defer client.Close()

// Start a separate goroutine for Prometheus metrics
// In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
go func() {
prometheusPort := 2112
log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
http.Handle("/metrics", promhttp.Handler())
err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
if err != nil {
log.Fatal(err)
}
}()

// Create a producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "topic-1",
})
if err != nil {
log.Fatal(err)
}

defer producer.Close()

ctx := context.Background()

// Write your business logic here
// In this case, you build a simple Web server. You can produce messages by requesting http://localhost:8082/produce
webPort := 8082
http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) {
msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello world")),
})
if err != nil {
log.Fatal(err)
} else {
log.Printf("Published message: %v", msgId)
fmt.Fprintf(w, "Published message: %v", msgId)
}
})

err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
if err != nil {
log.Fatal(err)
}
  1. To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (prometheus.yml).
scrape_configs:
- job_name: pulsar-client-go-metrics
scrape_interval: 10s
static_configs:
- targets:
- localhost:2112

Now you can query Pulsar client metrics on Prometheus.

Producer configurationโ€‹

All available options of ProducerOptions are here.

Consumersโ€‹

Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions object. Here's a basic example that uses channels:

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

for i := 0; i < 10; i++ {
// may block here
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))

consumer.Ack(msg)
}

if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}

Consumer operationsโ€‹

All available methods of Consumer interface are here.

Create single-topic consumerโ€‹

client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}

defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
// fill `Topic` field will create a single-topic consumer
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

Create regex-topic consumerโ€‹

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()

topicsPattern := "persistent://public/default/topic.*"
opts := pulsar.ConsumerOptions{
// fill `TopicsPattern` field will create a regex consumer
TopicsPattern: topicsPattern,
SubscriptionName: "regex-sub",
}

consumer, err := client.Subscribe(opts)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

Create multi-topic consumerโ€‹

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}

topics := []string{"topic-1", "topic-2"}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
// fill `Topics` field will create a multi-topic consumer
Topics: topics,
SubscriptionName: "multi-topic-sub",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

Create consumer listenerโ€‹

import (
"fmt"
"log"

"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}

defer client.Close()

// we can listen this channel
channel := make(chan pulsar.ConsumerMessage, 100)

options := pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
// fill `MessageChannel` field will create a listener
MessageChannel: channel,
}

consumer, err := client.Subscribe(options)
if err != nil {
log.Fatal(err)
}

defer consumer.Close()

// Receive messages from channel. The channel returns a struct `ConsumerMessage` which contains message and the consumer from where
// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
// shared across multiple consumers as well
for cm := range channel {
consumer := cm.Consumer
msg := cm.Message
fmt.Printf("Consumer %s received a message, msgId: %v, content: '%s'\n",
consumer.Name(), msg.ID(), string(msg.Payload()))

consumer.Ack(msg)
}
}

Receive message with timeoutโ€‹

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

topic := "test-topic-with-no-messages"
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

// create consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub1",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

// receive message with a timeout
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())

Use schema in consumerโ€‹

type testJSON struct {
ID int `json:"id"`
Name string `json:"name"`
}

var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

var s testJSON

consumerJS := pulsar.NewJSONSchema(exampleSchemaDef, nil)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "jsonTopic",
SubscriptionName: "sub-1",
Schema: consumerJS,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
})
if err != nil {
log.Fatal(err)
}

msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

err = msg.GetSchemaValue(&s)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

How to use Prometheus metrics in consumerโ€‹

In this guide, This section demonstrates how to create a simple Pulsar consumer application that exposes Prometheus metrics via HTTP.

  1. Write a simple consumer application.
// Create a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}

defer client.Close()

// Start a separate goroutine for Prometheus metrics
// In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
go func() {
prometheusPort := 2112
log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
http.Handle("/metrics", promhttp.Handler())
err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
if err != nil {
log.Fatal(err)
}
}()

// Create a consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "sub-1",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}

defer consumer.Close()

ctx := context.Background()

// Write your business logic here
// In this case, you build a simple Web server. You can consume messages by requesting http://localhost:8083/consume
webPort := 8083
http.HandleFunc("/consume", func(w http.ResponseWriter, r *http.Request) {
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
} else {
log.Printf("Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
fmt.Fprintf(w, "Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
})

err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
if err != nil {
log.Fatal(err)
}
  1. To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (prometheus.yml).
scrape_configs:
- job_name: pulsar-client-go-metrics
scrape_interval: 10s
static_configs:
- targets:
- localhost: 2112

Now you can query Pulsar client metrics on Prometheus.

Consumer configurationโ€‹

All available options of ConsumerOptions are here.

Readersโ€‹

Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recent unacked message). You can configure Go readers using a ReaderOptions object. Here's an example:

reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()

Reader operationsโ€‹

All available methods of the Reader interface are here.

Reader exampleโ€‹

Here's an example usage of a Go reader that uses the Next() method to process incoming messages:

import (
"context"
"fmt"
"log"

"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}

defer client.Close()

reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
}

In the example above, the reader begins reading from the earliest available message (specified by pulsar.EarliestMessage). The reader can also begin reading from the latest message (pulsar.LatestMessage) or some other message ID specified by bytes using the DeserializeMessageID function, which takes a byte array and returns a MessageID object. Here's an example:

lastSavedId := // Read last saved message id from external store as byte[]

reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "my-golang-topic",
StartMessageID: pulsar.DeserializeMessageID(lastSavedId),
})

Use reader to read specific messageโ€‹

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})

if err != nil {
log.Fatal(err)
}
defer client.Close()

topic := "topic-1"
ctx := context.Background()

// create producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()

// send 10 messages
msgIDs := [10]pulsar.MessageID{}
for i := 0; i < 10; i++ {
msgID, _ := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
msgIDs[i] = msgID
}

// create reader on 5th message (not included)
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageID: msgIDs[4],
StartMessageIDInclusive: false,
})

if err != nil {
log.Fatal(err)
}
defer reader.Close()

// receive the remaining 5 messages
for i := 5; i < 10; i++ {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Read %d-th msg: %s\n", i, string(msg.Payload()))
}
// create reader on 5th message (included)
readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageID: msgIDs[4],
StartMessageIDInclusive: true,
})

if err != nil {
log.Fatal(err)
}
defer readerInclusive.Close()

Reader configurationโ€‹

All available options of ReaderOptions are here.

Messagesโ€‹

The Pulsar Go client provides a ProducerMessage interface that you can use to construct messages to producers on Pulsar topics. Here's an example message:

msg := pulsar.ProducerMessage{
Payload: []byte("Here is some message data"),
Key: "message-key",
Properties: map[string]string{
"foo": "bar",
},
EventTime: time.Now(),
ReplicationClusters: []string{"cluster1", "cluster3"},
}

if _, err := producer.send(msg); err != nil {
log.Fatalf("Could not publish message due to: %v", err)
}

All methods of ProducerMessage object are here.

TLS encryption and authenticationโ€‹

To use TLS encryption, you need to configure your client to do so:

  • Use pulsar+ssl URL type
  • Set TLSTrustCertsFilePath to the path to the TLS certs used by your client and the Pulsar broker
  • Configure Authentication option

Here's an example:

opts := pulsar.ClientOptions{
URL: "pulsar+ssl://my-cluster.com:6651",
TLSTrustCertsFilePath: "/path/to/certs/my-cert.csr",
Authentication: pulsar.NewAuthenticationTLS("my-cert.pem", "my-key.pem"),
}

OAuth2 authenticationโ€‹

To use OAuth2 authentication, you need to configure your client to perform the following operations.

This example shows how to configure OAuth2 authentication.

oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
"type": "client_credentials",
"issuerUrl": "https://dev-kt-aa9ne.us.auth0.com",
"audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/",
"privateKey": "/path/to/privateKey",
"clientId": "0Xx...Yyxeny",
})
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://my-cluster:6650",
Authentication: oauth,
})