Pulsar Go client
Tips: The CGo client has been deprecated since version 2.7.0.
You can use Pulsar Go client to create Pulsar producers, consumers, and readers in Go (aka Golang).
API docs available as well For standard API docs, consult the Godoc.
Installation​
Install go package​
You can get the pulsar
library by using go get
or use it with go module
.
Download the library of Go client to local environment:
$ go get -u "github.com/apache/pulsar-client-go/pulsar"
Once installed locally, you can import it into your project:
import "github.com/apache/pulsar-client-go/pulsar"
Use with go module:
$ mkdir test_dir && cd test_dir
Write a sample script in the test_dir
directory (such as test_example.go
) and write package main
at the beginning of the file.
$ go mod init test_dir
$ go mod tidy && go mod download
$ go build test_example.go
$ ./test_example
Connection URLs​
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
Pulsar protocol URLs are assigned to specific clusters, use the pulsar
scheme and have a default port of 6650. Here's an example for localhost
:
pulsar://localhost:6650
If you have multiple brokers, you can set the URL as below.
pulsar://localhost:6550,localhost:6651,localhost:6652
A URL for a production Pulsar cluster may look something like this:
pulsar://pulsar.us-west.example.com:6650
If you're using TLS authentication, the URL will look like something like this:
pulsar+ssl://pulsar.us-west.example.com:6651
Create a client​
In order to interact with Pulsar, you'll first need a Client
object. You can create a client object using the NewClient
function, passing in a ClientOptions
object (more on configuration below). Here's an example:
import (
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer client.Close()
}
If you have multiple brokers, you can initiate a client object as below.
import (
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650,localhost:6651,localhost:6652",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer client.Close()
}
The following configurable parameters are available for Pulsar clients:
Name | Description | Default |
---|---|---|
URL | Configure the service URL for the Pulsar service. If you have multiple brokers, you can set multiple Pulsar cluster addresses for a client. This parameter is required. | None |
ConnectionTimeout | Timeout for the establishment of a TCP connection | 30s |
OperationTimeout | Set the operation timeout. Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed | 30s |
Authentication | Configure the authentication provider. Example: Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem") | no authentication |
TLSTrustCertsFilePath | Set the path to the trusted TLS certificate file | |
TLSAllowInsecureConnection | Configure whether the Pulsar client accept untrusted TLS certificate from broker | false |
TLSValidateHostname | Configure whether the Pulsar client verify the validity of the host name from broker | false |
ListenerName | Configure the net model for VPC users to connect to the Pulsar broker | |
MaxConnectionsPerBroker | Max number of connections to a single broker that is kept in the pool | 1 |
CustomMetricsLabels | Add custom labels to all the metrics reported by this client instance | |
Logger | Configure the logger used by the client | logrus.StandardLogger |
Producers​
Pulsar producers publish messages to Pulsar topics. You can configure Go producers using a ProducerOptions
object. Here's an example:
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
if err != nil {
log.Fatal(err)
}
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Producer operations​
Pulsar Go producers have the following methods available:
Method | Description | Return type |
---|---|---|
Topic() | Fetches the producer's topic | string |
Name() | Fetches the producer's name | string |
Send(context.Context, *ProducerMessage) | Publishes a message to the producer's topic. This call will block until the message is successfully acknowledged by the Pulsar broker, or an error will be thrown if the timeout set using the SendTimeout in the producer's configuration is exceeded. | (MessageID, error) |
SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error)) | Send a message, this call will be blocking until is successfully acknowledged by the Pulsar broker. | |
LastSequenceID() | Get the last sequence id that was published by this producer. his represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that was published and acknowledged by the broker. | int64 |
Flush() | Flush all the messages buffered in the client and wait until all messages have been successfully persisted. | error |
Close() | Closes the producer and releases all resources allocated to it. If Close() is called then no more messages will be accepted from the publisher. This method will block until all pending publish requests have been persisted by Pulsar. If an error is thrown, no pending writes will be retried. |
Producer Example​
How to use message router in producer​
client, err := NewClient(pulsar.ClientOptions{
URL: serviceURL,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Only subscribe on the specific partition
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-partitioned-topic-partition-2",
SubscriptionName: "my-sub",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-partitioned-topic",
MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
return 2
},
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
How to use schema interface in producer​
type testJSON struct {
ID int `json:"id"`
Name string `json:"name"`
}
var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)
client, err := NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
properties := make(map[string]string)
properties["pulsar"] = "hello"
jsonSchemaWithProperties := NewJSONSchema(exampleSchemaDef, properties)
producer, err := client.CreateProducer(ProducerOptions{
Topic: "jsonTopic",
Schema: jsonSchemaWithProperties,
})
assert.Nil(t, err)
_, err = producer.Send(context.Background(), &ProducerMessage{
Value: &testJSON{
ID: 100,
Name: "pulsar",
},
})
if err != nil {
log.Fatal(err)
}
producer.Close()
How to use delay relative in producer​
client, err := NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topicName := newTopicName()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topicName,
SubscriptionName: "subName",
Type: Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
ID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("test")),
DeliverAfter: 3 * time.Second,
})
if err != nil {
log.Fatal(err)
}
fmt.Println(ID)
ctx, canc := context.WithTimeout(context.Background(), 1*time.Second)
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
canc()
ctx, canc = context.WithTimeout(context.Background(), 5*time.Second)
msg, err = consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
canc()
How to use Prometheus metrics in producer​
Pulsar Go client registers client metrics using Prometheus. This section demonstrates how to create a simple Pulsar producer application that exposes Prometheus metrics via HTTP.
- Write a simple producer application.
// Create a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Start a separate goroutine for Prometheus metrics
// In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
go func() {
prometheusPort := 2112
log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
http.Handle("/metrics", promhttp.Handler())
err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
if err != nil {
log.Fatal(err)
}
}()
// Create a producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "topic-1",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
ctx := context.Background()
// Write your business logic here
// In this case, you build a simple Web server. You can produce messages by requesting http://localhost:8082/produce
webPort := 8082
http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) {
msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello world")),
})
if err != nil {
log.Fatal(err)
} else {
log.Printf("Published message: %v", msgId)
fmt.Fprintf(w, "Published message: %v", msgId)
}
})
err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
if err != nil {
log.Fatal(err)
}
- To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (
prometheus.yml
).
scrape_configs:
- job_name: pulsar-client-go-metrics
scrape_interval: 10s
static_configs:
- targets:
- localhost:2112
Now you can query Pulsar client metrics on Prometheus.
Producer configuration​
Name | Description | Default |
---|---|---|
Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | |
Name | Name specify a name for the producer. If not assigned, the system will generate a globally unique name which can be access with Producer.ProducerName(). | |
Properties | Properties attach a set of application defined properties to the producer This properties will be visible in the topic stats | |
SendTimeout | SendTimeout set the timeout for a message that is not acknowledged by the server | 30s |
DisableBlockIfQueueFull | DisableBlockIfQueueFull control whether Send and SendAsync block if producer's message queue is full | false |
MaxPendingMessages | MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. | |
HashingScheme | HashingScheme change the HashingScheme used to chose the partition on where to publish a particular message. | JavaStringHash |
CompressionType | CompressionType set the compression type for the producer. | not compressed |
CompressionLevel | Define the desired compression level. Options: Default, Faster and Better | Default |
MessageRouter | MessageRouter set a custom message routing policy by passing an implementation of MessageRouter | |
DisableBatching | DisableBatching control whether automatic batching of messages is enabled for the producer. | false |
BatchingMaxPublishDelay | BatchingMaxPublishDelay set the time period within which the messages sent will be batched | 1ms |
BatchingMaxMessages | BatchingMaxMessages set the maximum number of messages permitted in a batch. | 1000 |
BatchingMaxSize | BatchingMaxSize sets the maximum number of bytes permitted in a batch. | 128KB |
Schema | Schema set a custom schema type by passing an implementation of Schema | bytes[] |
Interceptors | A chain of interceptors. These interceptors are called at some points defined in the ProducerInterceptor interface. | None |
MaxReconnectToBroker | MaxReconnectToBroker set the maximum retry number of reconnectToBroker | ultimate |
BatcherBuilderType | BatcherBuilderType sets the batch builder type. This is used to create a batch container when batching is enabled. Options: DefaultBatchBuilder and KeyBasedBatchBuilder | DefaultBatchBuilder |