Skip to main content

Pulsar Go client

Tips: The CGo client has been deprecated since version 2.7.0.

You can use Pulsar Go client to create Pulsar producers, consumers, and readers in Go (aka Golang).

API docs available as well For standard API docs, consult the Godoc.

Installation​

Install go package​

You can get the pulsar library by using go get or use it with go module.

Download the library of Go client to local environment:


$ go get -u "github.com/apache/pulsar-client-go/pulsar"

Once installed locally, you can import it into your project:


import "github.com/apache/pulsar-client-go/pulsar"

Use with go module:


$ mkdir test_dir && cd test_dir

Write a sample script in the test_dir directory (such as test_example.go) and write package main at the beginning of the file.


$ go mod init test_dir
$ go mod tidy && go mod download
$ go build test_example.go
$ ./test_example

Connection URLs​

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

Pulsar protocol URLs are assigned to specific clusters, use the pulsar scheme and have a default port of 6650. Here's an example for localhost:


pulsar://localhost:6650

If you have multiple brokers, you can set the URL as below.


pulsar://localhost:6550,localhost:6651,localhost:6652

A URL for a production Pulsar cluster may look something like this:


pulsar://pulsar.us-west.example.com:6650

If you're using TLS authentication, the URL will look like something like this:


pulsar+ssl://pulsar.us-west.example.com:6651

Create a client​

In order to interact with Pulsar, you'll first need a Client object. You can create a client object using the NewClient function, passing in a ClientOptions object (more on configuration below). Here's an example:


import (
"log"
"time"

"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}

defer client.Close()
}

If you have multiple brokers, you can initiate a client object as below.


import (
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650,localhost:6651,localhost:6652",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}

defer client.Close()
}

The following configurable parameters are available for Pulsar clients:

NameDescriptionDefault
URLConfigure the service URL for the Pulsar service.

If you have multiple brokers, you can set multiple Pulsar cluster addresses for a client.

This parameter is required.
None
ConnectionTimeoutTimeout for the establishment of a TCP connection30s
OperationTimeoutSet the operation timeout. Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed30s
AuthenticationConfigure the authentication provider. Example: Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")no authentication
TLSTrustCertsFilePathSet the path to the trusted TLS certificate file
TLSAllowInsecureConnectionConfigure whether the Pulsar client accept untrusted TLS certificate from brokerfalse
TLSValidateHostnameConfigure whether the Pulsar client verify the validity of the host name from brokerfalse
ListenerNameConfigure the net model for VPC users to connect to the Pulsar broker
MaxConnectionsPerBrokerMax number of connections to a single broker that is kept in the pool1
CustomMetricsLabelsAdd custom labels to all the metrics reported by this client instance
LoggerConfigure the logger used by the clientlogrus.StandardLogger

Producers​

Pulsar producers publish messages to Pulsar topics. You can configure Go producers using a ProducerOptions object. Here's an example:


producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})

if err != nil {
log.Fatal(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})

defer producer.Close()

if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

Producer operations​

Pulsar Go producers have the following methods available:

MethodDescriptionReturn type
Topic()Fetches the producer's topicstring
Name()Fetches the producer's namestring
Send(context.Context, *ProducerMessage)Publishes a message to the producer's topic. This call will block until the message is successfully acknowledged by the Pulsar broker, or an error will be thrown if the timeout set using the SendTimeout in the producer's configuration is exceeded.(MessageID, error)
SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))Send a message, this call will be blocking until is successfully acknowledged by the Pulsar broker.
LastSequenceID()Get the last sequence id that was published by this producer. his represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that was published and acknowledged by the broker.int64
Flush()Flush all the messages buffered in the client and wait until all messages have been successfully persisted.error
Close()Closes the producer and releases all resources allocated to it. If Close() is called then no more messages will be accepted from the publisher. This method will block until all pending publish requests have been persisted by Pulsar. If an error is thrown, no pending writes will be retried.

Producer Example​

How to use message router in producer​


client, err := NewClient(pulsar.ClientOptions{
URL: serviceURL,
})

if err != nil {
log.Fatal(err)
}
defer client.Close()

// Only subscribe on the specific partition
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-partitioned-topic-partition-2",
SubscriptionName: "my-sub",
})

if err != nil {
log.Fatal(err)
}
defer consumer.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-partitioned-topic",
MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
return 2
},
})

if err != nil {
log.Fatal(err)
}
defer producer.Close()

How to use schema interface in producer​


type testJSON struct {
ID int `json:"id"`
Name string `json:"name"`
}


var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)


client, err := NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

properties := make(map[string]string)
properties["pulsar"] = "hello"
jsonSchemaWithProperties := NewJSONSchema(exampleSchemaDef, properties)
producer, err := client.CreateProducer(ProducerOptions{
Topic: "jsonTopic",
Schema: jsonSchemaWithProperties,
})
assert.Nil(t, err)

_, err = producer.Send(context.Background(), &ProducerMessage{
Value: &testJSON{
ID: 100,
Name: "pulsar",
},
})
if err != nil {
log.Fatal(err)
}
producer.Close()

How to use delay relative in producer​


client, err := NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

topicName := newTopicName()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topicName,
SubscriptionName: "subName",
Type: Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

ID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("test")),
DeliverAfter: 3 * time.Second,
})
if err != nil {
log.Fatal(err)
}
fmt.Println(ID)

ctx, canc := context.WithTimeout(context.Background(), 1*time.Second)
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
canc()

ctx, canc = context.WithTimeout(context.Background(), 5*time.Second)
msg, err = consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
canc()

How to use Prometheus metrics in producer​

Pulsar Go client registers client metrics using Prometheus. This section demonstrates how to create a simple Pulsar producer application that exposes Prometheus metrics via HTTP.

  1. Write a simple producer application.

// Create a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}

defer client.Close()

// Start a separate goroutine for Prometheus metrics
// In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
go func() {
prometheusPort := 2112
log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
http.Handle("/metrics", promhttp.Handler())
err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
if err != nil {
log.Fatal(err)
}
}()

// Create a producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "topic-1",
})
if err != nil {
log.Fatal(err)
}

defer producer.Close()

ctx := context.Background()

// Write your business logic here
// In this case, you build a simple Web server. You can produce messages by requesting http://localhost:8082/produce
webPort := 8082
http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) {
msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello world")),
})
if err != nil {
log.Fatal(err)
} else {
log.Printf("Published message: %v", msgId)
fmt.Fprintf(w, "Published message: %v", msgId)
}
})

err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
if err != nil {
log.Fatal(err)
}

  1. To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (prometheus.yml).

scrape_configs:
- job_name: pulsar-client-go-metrics
scrape_interval: 10s
static_configs:
- targets:
- localhost:2112

Now you can query Pulsar client metrics on Prometheus.

Producer configuration​

NameDescriptionDefault
TopicTopic specify the topic this consumer will subscribe to. This argument is required when constructing the reader.
NameName specify a name for the producer. If not assigned, the system will generate a globally unique name which can be access with Producer.ProducerName().
PropertiesProperties attach a set of application defined properties to the producer This properties will be visible in the topic stats
SendTimeoutSendTimeout set the timeout for a message that is not acknowledged by the server30s
DisableBlockIfQueueFullDisableBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is fullfalse
MaxPendingMessagesMaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
HashingSchemeHashingScheme change the HashingScheme used to chose the partition on where to publish a particular message.JavaStringHash
CompressionTypeCompressionType set the compression type for the producer.not compressed
CompressionLevelDefine the desired compression level. Options: Default, Faster and BetterDefault
MessageRouterMessageRouter set a custom message routing policy by passing an implementation of MessageRouter
DisableBatchingDisableBatching control whether automatic batching of messages is enabled for the producer.false
BatchingMaxPublishDelayBatchingMaxPublishDelay set the time period within which the messages sent will be batched1ms
BatchingMaxMessagesBatchingMaxMessages set the maximum number of messages permitted in a batch.1000
BatchingMaxSizeBatchingMaxSize sets the maximum number of bytes permitted in a batch.128KB
SchemaSchema set a custom schema type by passing an implementation of Schemabytes[]
InterceptorsA chain of interceptors. These interceptors are called at some points defined in the ProducerInterceptor interface.None
MaxReconnectToBrokerMaxReconnectToBroker set the maximum retry number of reconnectToBrokerultimate
BatcherBuilderTypeBatcherBuilderType sets the batch builder type. This is used to create a batch container when batching is enabled. Options: DefaultBatchBuilder and KeyBasedBatchBuilderDefaultBatchBuilder

Consumers​

Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions object. Here's a basic example that uses channels:


consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))

consumer.Ack(msg)
}

if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}

Consumer operations​

Pulsar Go consumers have the following methods available:

MethodDescriptionReturn type
Subscription()Returns the consumer's subscription namestring
Unsubcribe()Unsubscribes the consumer from the assigned topic. Throws an error if the unsubscribe operation is somehow unsuccessful.error
Receive(context.Context)Receives a single message from the topic. This method blocks until a message is available.(Message, error)
Chan()Chan returns a channel from which to consume messages.<-chan ConsumerMessage
Ack(Message)Acknowledges a message to the Pulsar broker
AckID(MessageID)Acknowledges a message to the Pulsar broker by message ID
ReconsumeLater(msg Message, delay time.Duration)ReconsumeLater mark a message for redelivery after custom delay
Nack(Message)Acknowledge the failure to process a single message.
NackID(MessageID)Acknowledge the failure to process a single message.
Seek(msgID MessageID)Reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.error
SeekByTime(time time.Time)Reset the subscription associated with this consumer to a specific message publish time.error
Close()Closes the consumer, disabling its ability to receive messages from the broker
Name()Name returns the name of consumerstring

Receive example​

How to use regex consumer​


client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})

defer client.Close()

p, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicInRegex,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer p.Close()

topicsPattern := fmt.Sprintf("persistent://%s/foo.*", namespace)
opts := pulsar.ConsumerOptions{
TopicsPattern: topicsPattern,
SubscriptionName: "regex-sub",
}
consumer, err := client.Subscribe(opts)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

How to use multi topics Consumer​


func newTopicName() string {
return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
}


topic1 := "topic-1"
topic2 := "topic-2"

client, err := NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
topics := []string{topic1, topic2}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topics: topics,
SubscriptionName: "multi-topic-sub",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

How to use consumer listener​


import (
"fmt"
"log"

"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}

defer client.Close()

channel := make(chan pulsar.ConsumerMessage, 100)

options := pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
}

options.MessageChannel = channel

consumer, err := client.Subscribe(options)
if err != nil {
log.Fatal(err)
}

defer consumer.Close()

// Receive messages from channel. The channel returns a struct which contains message and the consumer from where
// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
// shared across multiple consumers as well
for cm := range channel {
msg := cm.Message
fmt.Printf("Received message msgId: %v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))

consumer.Ack(msg)
}
}

How to use consumer receive timeout​


client, err := NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

topic := "test-topic-with-no-messages"
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

// create consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub1",
Type: Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

msg, err := consumer.Receive(ctx)
fmt.Println(msg.Payload())
if err != nil {
log.Fatal(err)
}

How to use schema in consumer​


type testJSON struct {
ID int `json:"id"`
Name string `json:"name"`
}


var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)


client, err := NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

var s testJSON

consumerJS := NewJSONSchema(exampleSchemaDef, nil)
consumer, err := client.Subscribe(ConsumerOptions{
Topic: "jsonTopic",
SubscriptionName: "sub-1",
Schema: consumerJS,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
err = msg.GetSchemaValue(&s)
if err != nil {
log.Fatal(err)
}

defer consumer.Close()

How to use Prometheus metrics in consumer​

In this guide, This section demonstrates how to create a simple Pulsar consumer application that exposes Prometheus metrics via HTTP.

  1. Write a simple consumer application.

// Create a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}

defer client.Close()

// Start a separate goroutine for Prometheus metrics
// In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
go func() {
prometheusPort := 2112
log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
http.Handle("/metrics", promhttp.Handler())
err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
if err != nil {
log.Fatal(err)
}
}()

// Create a consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "sub-1",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}

defer consumer.Close()

ctx := context.Background()

// Write your business logic here
// In this case, you build a simple Web server. You can consume messages by requesting http://localhost:8083/consume
webPort := 8083
http.HandleFunc("/consume", func(w http.ResponseWriter, r *http.Request) {
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
} else {
log.Printf("Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
fmt.Fprintf(w, "Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
})

err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
if err != nil {
log.Fatal(err)
}

  1. To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (prometheus.yml).

scrape_configs:
- job_name: pulsar-client-go-metrics
scrape_interval: 10s
static_configs:
- targets:
- localhost:2112

Now you can query Pulsar client metrics on Prometheus.

Consumer configuration​

NameDescriptionDefault
TopicTopic specify the topic this consumer will subscribe to. This argument is required when constructing the reader.
TopicsSpecify a list of topics this consumer will subscribe on. Either a topic, a list of topics or a topics pattern are required when subscribing
TopicsPatternSpecify a regular expression to subscribe to multiple topics under the same namespace. Either a topic, a list of topics or a topics pattern are required when subscribing
AutoDiscoveryPeriodSpecify the interval in which to poll for new partitions or new topics if using a TopicsPattern.
SubscriptionNameSpecify the subscription name for this consumer. This argument is required when subscribing
NameSet the consumer name
PropertiesProperties attach a set of application defined properties to the producer This properties will be visible in the topic stats
TypeSelect the subscription type to be used when subscribing to the topic.Exclusive
SubscriptionInitialPositionInitialPosition at which the cursor will be set when subscribeLatest
DLQConfiguration for Dead Letter Queue consumer policy.no DLQ
MessageChannelSets a MessageChannel for the consumer. When a message is received, it will be pushed to the channel for consumption
ReceiverQueueSizeSets the size of the consumer receive queue.1000
NackRedeliveryDelayThe delay after which to redeliver the messages that failed to be processed1min
ReadCompactedIf enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog of the topicfalse
ReplicateSubscriptionStateMark the subscription as replicated to keep it in sync across clustersfalse
KeySharedPolicyConfiguration for Key Shared consumer policy.
RetryEnableAuto retry send messages to default filled DLQPolicy topicsfalse
InterceptorsA chain of interceptors. These interceptors are called at some points defined in the ConsumerInterceptor interface.
MaxReconnectToBrokerMaxReconnectToBroker set the maximum retry number of reconnectToBroker.ultimate
SchemaSchema set a custom schema type by passing an implementation of Schemabytes[]

Readers​

Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recent unacked message). You can configure Go readers using a ReaderOptions object. Here's an example:


reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()

Reader operations​

Pulsar Go readers have the following methods available:

MethodDescriptionReturn type
Topic()Returns the reader's topicstring
Next(context.Context)Receives the next message on the topic (analogous to the Receive method for consumers). This method blocks until a message is available.(Message, error)
HasNext()Check if there is any message available to read from the current position(bool, error)
Close()Closes the reader, disabling its ability to receive messages from the brokererror
Seek(MessageID)Reset the subscription associated with this reader to a specific message IDerror
SeekByTime(time time.Time)Reset the subscription associated with this reader to a specific message publish timeerror

Reader example​

Here's an example usage of a Go reader that uses the Next() method to process incoming messages:


import (
"context"
"fmt"
"log"

"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}

defer client.Close()

reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
}

In the example above, the reader begins reading from the earliest available message (specified by pulsar.EarliestMessage). The reader can also begin reading from the latest message (pulsar.LatestMessage) or some other message ID specified by bytes using the DeserializeMessageID function, which takes a byte array and returns a MessageID object. Here's an example:


lastSavedId := // Read last saved message id from external store as byte[]

reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "my-golang-topic",
StartMessageID: pulsar.DeserializeMessageID(lastSavedId),
})

How to use reader to read specific message​


client, err := NewClient(pulsar.ClientOptions{
URL: lookupURL,
})

if err != nil {
log.Fatal(err)
}
defer client.Close()

topic := "topic-1"
ctx := context.Background()

// create producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()

// send 10 messages
msgIDs := [10]MessageID{}
for i := 0; i < 10; i++ {
msgID, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.NoError(t, err)
assert.NotNil(t, msgID)
msgIDs[i] = msgID
}

// create reader on 5th message (not included)
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageID: msgIDs[4],
})

if err != nil {
log.Fatal(err)
}
defer reader.Close()

// receive the remaining 5 messages
for i := 5; i < 10; i++ {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}

// create reader on 5th message (included)
readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageID: msgIDs[4],
StartMessageIDInclusive: true,
})

if err != nil {
log.Fatal(err)
}
defer readerInclusive.Close()

Reader configuration​

NameDescriptionDefault
TopicTopic specify the topic this consumer will subscribe to. This argument is required when constructing the reader.
NameName set the reader name.
PropertiesAttach a set of application defined properties to the reader. This properties will be visible in the topic stats
StartMessageIDStartMessageID initial reader positioning is done by specifying a message id.
StartMessageIDInclusiveIf true, the reader will start at the StartMessageID, included. Default is false and the reader will start from the "next" messagefalse
MessageChannelMessageChannel sets a MessageChannel for the consumer When a message is received, it will be pushed to the channel for consumption
ReceiverQueueSizeReceiverQueueSize sets the size of the consumer receive queue.1000
SubscriptionRolePrefixSubscriptionRolePrefix set the subscription role prefix."reader"
ReadCompactedIf enabled, the reader will read messages from the compacted topic rather than reading the full message backlog of the topic. ReadCompacted can only be enabled when reading from a persistent topic.false

Messages​

The Pulsar Go client provides a ProducerMessage interface that you can use to construct messages to producer on Pulsar topics. Here's an example message:


msg := pulsar.ProducerMessage{
Payload: []byte("Here is some message data"),
Key: "message-key",
Properties: map[string]string{
"foo": "bar",
},
EventTime: time.Now(),
ReplicationClusters: []string{"cluster1", "cluster3"},
}

if _, err := producer.send(msg); err != nil {
log.Fatalf("Could not publish message due to: %v", err)
}

The following methods parameters are available for ProducerMessage objects:

ParameterDescription
PayloadThe actual data payload of the message
ValueValue and payload is mutually exclusive, Value interface{} for schema message.
KeyThe optional key associated with the message (particularly useful for things like topic compaction)
OrderingKeyOrderingKey sets the ordering key of the message.
PropertiesA key-value map (both keys and values must be strings) for any application-specific metadata attached to the message
EventTimeThe timestamp associated with the message
ReplicationClustersThe clusters to which this message will be replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default.
SequenceIDSet the sequence id to assign to the current message
DeliverAfterRequest to deliver the message only after the specified relative delay
DeliverAtDeliver the message only at or after the specified absolute timestamp

TLS encryption and authentication​

In order to use TLS encryption, you'll need to configure your client to do so:

  • Use pulsar+ssl URL type
  • Set TLSTrustCertsFilePath to the path to the TLS certs used by your client and the Pulsar broker
  • Configure Authentication option

Here's an example:


opts := pulsar.ClientOptions{
URL: "pulsar+ssl://my-cluster.com:6651",
TLSTrustCertsFilePath: "/path/to/certs/my-cert.csr",
Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem"),
}

OAuth2 authentication​

To use OAuth2 authentication, you'll need to configure your client to perform the following operations. This example shows how to configure OAuth2 authentication.


oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
"type": "client_credentials",
"issuerUrl": "https://dev-kt-aa9ne.us.auth0.com",
"audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/",
"privateKey": "/path/to/privateKey",
"clientId": "0Xx...Yyxeny",
})
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://my-cluster:6650",
Authentication: oauth,
})