Pulsar C# client
You can use the Pulsar C# client (DotPulsar) to create Pulsar producers and consumers in C#. All the methods in the producer, consumer, and reader of a C# client are thread-safe. The official documentation for DotPulsar is available here.
Installation
You can install the Pulsar C# client library either through the dotnet CLI or through the Visual Studio. This section describes how to install the Pulsar C# client library through the dotnet CLI. For information about how to install the Pulsar C# client library through the Visual Studio, see here.
Prerequisites
Install the .NET Core SDK, which provides the dotnet command-line tool. Starting in Visual Studio 2017, the dotnet CLI is automatically installed with any .NET Core related workloads.
Procedures
To install the Pulsar C# client library, follow these steps:
- 
Create a project. - 
Create a folder for the project. 
- 
Open a terminal window and switch to the new folder. 
- 
Create the project using the following command. 
 dotnet new console
- 
Use dotnet runto test that the app has been created properly.
 
- 
- 
Add the DotPulsar NuGet package. - 
Use the following command to install the DotPulsarpackage.
 dotnet add package DotPulsar
- 
After the command completes, open the .csprojfile to see the added reference.
 <ItemGroup>
 <PackageReference Include="DotPulsar" Version="2.0.1" />
 </ItemGroup>
 
- 
Client
This section describes some configuration examples for the Pulsar C# client.
Create client
This example shows how to create a Pulsar C# client connected to localhost.
using DotPulsar;
var client = PulsarClient.Builder().Build();
To create a Pulsar C# client by using the builder, you can specify the following options.
| Option | Description | Default | 
|---|---|---|
| ServiceUrl | Set the service URL for the Pulsar cluster. | pulsar://localhost:6650 | 
| RetryInterval | Set the time to wait before retrying an operation or a reconnection. | 3s | 
Create producer
This section describes how to create a producer.
- 
Create a producer by using the builder. 
 using DotPulsar;
 using DotPulsar.Extensions;
 var producer = client.NewProducer())
 .Topic("persistent://public/default/mytopic")
 .Create();
- 
Create a producer without using the builder. 
 using DotPulsar;
 var options = new ProducerOptions<byte[]>("persistent://public/default/mytopic", Schema.ByteArray);
 var producer = client.CreateProducer(options);
Create consumer
This section describes how to create a consumer.
- 
Create a consumer by using the builder. 
 using DotPulsar;
 using DotPulsar.Extensions;
 var consumer = client.NewConsumer()
 .SubscriptionName("MySubscription")
 .Topic("persistent://public/default/mytopic")
 .Create();
- 
Create a consumer without using the builder. 
 using DotPulsar;
 var options = new ConsumerOptions<byte[]>("MySubscription", "persistent://public/default/mytopic", Schema.ByteArray);
 var consumer = client.CreateConsumer(options);
Create reader
This section describes how to create a reader.
- 
Create a reader by using the builder. 
 using DotPulsar;
 using DotPulsar.Extensions;
 var reader = client.NewReader()
 .StartMessageId(MessageId.Earliest)
 .Topic("persistent://public/default/mytopic")
 .Create();
- 
Create a reader without using the builder. 
 using DotPulsar;
 var options = new ReaderOptions<byte[]>(MessageId.Earliest, "persistent://public/default/mytopic", Schema.ByteArray);
 var reader = client.CreateReader(options);
Configure encryption policies
The Pulsar C# client supports four kinds of encryption policies:
- EnforceUnencrypted: always use unencrypted connections.
- EnforceEncrypted: always use encrypted connections)
- PreferUnencrypted: use unencrypted connections, if possible.
- PreferEncrypted: use encrypted connections, if possible.
This example shows how to set the EnforceUnencrypted encryption policy.
using DotPulsar;
var client = PulsarClient.Builder()
                         .ConnectionSecurity(EncryptionPolicy.EnforceEncrypted)
                         .Build();
Configure authentication
Currently, the Pulsar C# client supports the TLS (Transport Layer Security) and JWT (JSON Web Token) authentication.
If you have followed Authentication using TLS, you get a certificate and a key. To use them from the Pulsar C# client, follow these steps:
- 
Create an unencrypted and password-less pfx file. 
 openssl pkcs12 -export -keypbe NONE -certpbe NONE -out admin.pfx -inkey admin.key.pem -in admin.cert.pem -passout pass:
- 
Use the admin.pfx file to create an X509Certificate2 and pass it to the Pulsar C# client. 
 using System.Security.Cryptography.X509Certificates;
 using DotPulsar;
 var clientCertificate = new X509Certificate2("admin.pfx");
 var client = PulsarClient.Builder()
 .AuthenticateUsingClientCertificate(clientCertificate)
 .Build();
Producer
A producer is a process that attaches to a topic and publishes messages to a Pulsar broker for processing. This section describes some configuration examples about the producer.
Send data
This example shows how to send data.
var data = Encoding.UTF8.GetBytes("Hello World");
await producer.Send(data);
Send messages with customized metadata
- 
Send messages with customized metadata by using the builder. 
 var messageId = await producer.NewMessage()
 .Property("SomeKey", "SomeValue")
 .Send(data);
- 
Send messages with customized metadata without using the builder. 
 var data = Encoding.UTF8.GetBytes("Hello World");
 var metadata = new MessageMetadata();
 metadata["SomeKey"] = "SomeValue";
 var messageId = await producer.Send(metadata, data));
Consumer
A consumer is a process that attaches to a topic through a subscription and then receives messages. This section describes some configuration examples about the consumer.
Receive messages
This example shows how a consumer receives messages from a topic.
await foreach (var message in consumer.Messages())
{
    Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}
Acknowledge messages
Messages can be acknowledged individually or cumulatively. For details about message acknowledgement, see acknowledgement.
- 
Acknowledge messages individually. 
 await consumer.Acknowledge(message);
- 
Acknowledge messages cumulatively. 
 await consumer.AcknowledgeCumulative(message);
Unsubscribe from topics
This example shows how a consumer unsubscribes from a topic.
await consumer.Unsubscribe();
Note
A consumer cannot be used and is disposed once the consumer unsubscribes from a topic.
Reader
A reader is actually just a consumer without a cursor. This means that Pulsar does not keep track of your progress and there is no need to acknowledge messages.
This example shows how a reader receives messages.
await foreach (var message in reader.Messages())
{
    Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}
Monitoring
This section describes how to monitor the producer, consumer, and reader state.
Monitor producer
The following table lists states available for the producer.
| State | Description | 
|---|---|
| Closed | The producer or the Pulsar client has been disposed. | 
| Connected | All is well. | 
| Disconnected | The connection is lost and attempts are being made to reconnect. | 
| Faulted | An unrecoverable error has occurred. | 
| PartiallyConnected | Some of the sub-producers are disconnected. | 
This example shows how to monitor the producer state.
private static async ValueTask Monitor(IProducer producer, CancellationToken cancellationToken)
{
    var state = ProducerState.Disconnected;
    while (!cancellationToken.IsCancellationRequested)
    {
        state = (await producer.StateChangedFrom(state, cancellationToken)).ProducerState;
        var stateMessage = state switch
        {
            ProducerState.Connected => $"The producer is connected",
            ProducerState.Disconnected => $"The producer is disconnected",
            ProducerState.Closed => $"The producer has closed",
            ProducerState.Faulted => $"The producer has faulted",
            ProducerState.PartiallyConnected => $"The producer is partially connected.",
            _ => $"The producer has an unknown state '{state}'"
        };
        Console.WriteLine(stateMessage);
        if (producer.IsFinalState(state))
            return;
    }
}
Monitor consumer state
The following table lists states available for the consumer.
| State | Description | 
|---|---|
| Active | All is well. | 
| Inactive | All is well. The subscription type is Failoverand you are not the active consumer. | 
| Closed | The consumer or the Pulsar client has been disposed. | 
| Disconnected | The connection is lost and attempts are being made to reconnect. | 
| Faulted | An unrecoverable error has occurred. | 
| ReachedEndOfTopic | No more messages are delivered. | 
| Unsubscribed | The consumer has unsubscribed. | 
This example shows how to monitor the consumer state.
private static async ValueTask Monitor(IConsumer consumer, CancellationToken cancellationToken)
{
    var state = ConsumerState.Disconnected;
    while (!cancellationToken.IsCancellationRequested)
    {
        state = (await consumer.StateChangedFrom(state, cancellationToken)).ConsumerState;
        var stateMessage = state switch
        {
            ConsumerState.Active => "The consumer is active",
            ConsumerState.Inactive => "The consumer is inactive",
            ConsumerState.Disconnected => "The consumer is disconnected",
            ConsumerState.Closed => "The consumer has closed",
            ConsumerState.ReachedEndOfTopic => "The consumer has reached end of topic",
            ConsumerState.Faulted => "The consumer has faulted",
            ConsumerState.Unsubscribed => "The consumer is unsubscribed.",
            _ => $"The consumer has an unknown state '{state}'"
        };
        Console.WriteLine(stateMessage);
        if (consumer.IsFinalState(state))
            return;
    }
}
Monitor reader state
The following table lists states available for the reader.
| State | Description | 
|---|---|
| Closed | The reader or the Pulsar client has been disposed. | 
| Connected | All is well. | 
| Disconnected | The connection is lost and attempts are being made to reconnect. | 
| Faulted | An unrecoverable error has occurred. | 
| ReachedEndOfTopic | No more messages are delivered. | 
This example shows how to monitor the reader state.
private static async ValueTask Monitor(IReader reader, CancellationToken cancellationToken)
{
    var state = ReaderState.Disconnected;
    while (!cancellationToken.IsCancellationRequested)
    {
        state = (await reader.StateChangedFrom(state, cancellationToken)).ReaderState;
        var stateMessage = state switch
        {
            ReaderState.Connected => "The reader is connected",
            ReaderState.Disconnected => "The reader is disconnected",
            ReaderState.Closed => "The reader has closed",
            ReaderState.ReachedEndOfTopic => "The reader has reached end of topic",
            ReaderState.Faulted => "The reader has faulted",
            _ => $"The reader has an unknown state '{state}'"
        };
        Console.WriteLine(stateMessage);
        if (reader.IsFinalState(state))
            return;
    }
}