OpenTelemetry Tracing for Pulsar Java Client
This document describes how to use OpenTelemetry distributed tracing with the Pulsar Java client.
Overview
The Pulsar Java client provides built-in support for OpenTelemetry distributed tracing. This allows you to:
- Trace message publishing from producer to broker
- Trace message consumption from broker to consumer
- Propagate trace context across services via message properties
- Extract trace context from external sources (e.g., HTTP requests)
- Create end-to-end traces across your distributed system
Features
Producer Tracing
Producer tracing creates spans for:
- send - Span starts when
send()orsendAsync()is called and completes when broker acknowledges receipt
Consumer Tracing
Consumer tracing creates spans for:
- process - Span starts when message is received and completes when message is acknowledged, negatively acknowledged, or ack timeout occurs
Trace Context Propagation
Trace context is automatically propagated using W3C TraceContext format:
traceparent- Contains trace ID, span ID, and trace flagstracestate- Contains vendor-specific trace information
Context is injected into and extracted from message properties, enabling seamless trace propagation across services.
Quick Start
1. Add Dependencies
The Pulsar client already includes OpenTelemetry API dependencies. You'll need to add the SDK and exporters:
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
2. Enable Tracing
There are three ways to enable tracing:
Option 1: Using OpenTelemetry Java Agent (Easiest)
# Start your application with the Java Agent
java -javaagent:opentelemetry-javaagent.jar \
-Dotel.service.name=my-service \
-Dotel.exporter.otlp.endpoint=http://localhost:4317 \
-jar your-application.jar
// Just enable tracing - uses GlobalOpenTelemetry from the agent
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.enableTracing(true) // That's it!
.build();
Option 2: With Explicit OpenTelemetry Instance
OpenTelemetry openTelemetry = // configure your OpenTelemetry instance
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.openTelemetry(openTelemetry, true) // Set OpenTelemetry AND enable tracing
.build();
Option 3: Using GlobalOpenTelemetry
// Configure GlobalOpenTelemetry once in your application
GlobalOpenTelemetry.set(myOpenTelemetry);
// Enable tracing in the client - uses GlobalOpenTelemetry
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.enableTracing(true)
.build();
What happens when tracing is enabled:
- Create spans for producer send operations
- Inject trace context into message properties automatically
- Create spans for consumer receive/ack operations
- Extract trace context from message properties automatically
- Link all spans to create end-to-end distributed traces
3. Manual Interceptor Configuration (Advanced)
If you prefer manual control, you can add interceptors explicitly:
import org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor;
import org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor;
// Create client (tracing not enabled globally)
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.openTelemetry(openTelemetry)
.build();
// Add interceptor manually to specific producer
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.intercept(new OpenTelemetryProducerInterceptor())
.create();
// Add interceptor manually to specific consumer
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.intercept(new OpenTelemetryConsumerInterceptor<>())
.subscribe();
Advanced Usage
End-to-End Tracing Example
This example shows how to create a complete trace from an HTTP request through Pulsar to a consumer:
// Service 1: HTTP API that publishes to Pulsar
@POST
@Path("/order")
public Response createOrder(@Context HttpHeaders headers, Order order) {
// Extract trace context from incoming HTTP request
Context context = TracingProducerBuilder.extractFromHeaders(
convertHeaders(headers));
// Publish to Pulsar with trace context
TracingProducerBuilder tracingBuilder = new TracingProducerBuilder();
producer.newMessage()
.value(order)
.let(builder -> tracingBuilder.injectContext(builder, context))
.send();
return Response.accepted().build();
}
// Service 2: Pulsar consumer that processes orders
Consumer<Order> consumer = client.newConsumer(Schema.JSON(Order.class))
.topic("orders")
.subscriptionName("order-processor")
.intercept(new OpenTelemetryConsumerInterceptor<>())
.subscribe();
while (true) {
Message<Order> msg = consumer.receive();
// Trace context is automatically extracted
// Any spans created here will be part of the same trace
processOrder(msg.getValue());
consumer.acknowledge(msg);
}
Custom Span Creation
You can create custom spans during message processing:
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
Tracer tracer = GlobalOpenTelemetry.get().getTracer("my-app");
Message<String> msg = consumer.receive();
// Create a custom span for processing
Span span = tracer.spanBuilder("process-message")
.setSpanKind(SpanKind.INTERNAL)
.startSpan();
try (Scope scope = span.makeCurrent()) {
// Your processing logic
processMessage(msg.getValue());
span.setStatus(StatusCode.OK);
} catch (Exception e) {
span.recordException(e);
span.setStatus(StatusCode.ERROR);
throw e;
} finally {
span.end();
consumer.acknowledge(msg);
}
Configuration
Compatibility with OpenTelemetry Java Agent
This implementation is fully compatible with the OpenTelemetry Java Instrumentation for Pulsar:
- Both use W3C TraceContext format (traceparent, tracestate headers)
- Both propagate context via message properties
- No conflicts: Our implementation checks if trace context is already present (from Java Agent) and avoids duplicate injection
- You can use either approach or both together
Using OpenTelemetry Java Agent
The easiest way to enable tracing is using the OpenTelemetry Java Agent (automatic instrumentation):
java -javaagent:path/to/opentelemetry-javaagent.jar \
-Dotel.service.name=my-service \
-Dotel.exporter.otlp.endpoint=http://localhost:4317 \
-jar your-application.jar
Note: When using the Java Agent, you don't need to call .openTelemetry(otel, true) as the agent automatically instruments Pulsar. However, calling it won't cause conflicts.
Programmatic Configuration
You can also configure OpenTelemetry programmatically:
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint("http://localhost:4317")
.build();
SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
.build();
OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.buildAndRegisterGlobal();
Environment Variables
Configure via environment variables:
export OTEL_SERVICE_NAME=my-service
export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
export OTEL_TRACES_EXPORTER=otlp
export OTEL_METRICS_EXPORTER=otlp
Span Attributes
The tracing implementation adds the following attributes to spans following the OpenTelemetry messaging semantic conventions:
Producer Spans
messaging.system: "pulsar"messaging.destination.name: Topic namemessaging.operation.name: "send"messaging.message.id: Message ID (added when broker confirms)
Span naming: send {topic} (e.g., "send my-topic")
Consumer Spans
messaging.system: "pulsar"messaging.destination.name: Topic namemessaging.destination.subscription.name: Subscription namemessaging.operation.name: "process"messaging.message.id: Message IDmessaging.pulsar.acknowledgment.type: How the message was acknowledged"acknowledge": Normal individual acknowledgment"cumulative_acknowledge": Cumulative acknowledgment"negative_acknowledge": Message negatively acknowledged (will retry)"ack_timeout": Acknowledgment timeout occurred (will retry)
Span naming: process {topic} (e.g., "process my-topic")
Span Lifecycle and Acknowledgment Behavior
Understanding how spans are handled for different acknowledgment scenarios. Every consumer span includes a messaging.pulsar.acknowledgment.type attribute indicating how it was completed:
Successful Acknowledgment
- Span ends with OK status
- Attribute:
messaging.pulsar.acknowledgment.type = "acknowledge"
Cumulative Acknowledgment
- Span ends with OK status
- Attribute:
messaging.pulsar.acknowledgment.type = "cumulative_acknowledge" - All spans up to the acknowledged position are ended with this attribute
Negative Acknowledgment
- Span ends with OK status (not an error)
- Attribute:
messaging.pulsar.acknowledgment.type = "negative_acknowledge" - This is normal flow, not a failure - the message will be redelivered and a new span will be created
Acknowledgment Timeout
- Span ends with OK status (not an error)
- Attribute:
messaging.pulsar.acknowledgment.type = "ack_timeout" - This is expected behavior when
ackTimeoutis configured - the message will be redelivered and a new span will be created
Application Exception During Processing
- If your application code throws an exception, create a child span and mark it with ERROR status
- The consumer span itself will end normally when you call
negativeAcknowledge() - This provides clear separation between messaging operations (OK) and application logic (ERROR)
Example - Separating messaging and application errors:
Message<String> msg = consumer.receive();
Span processingSpan = tracer.spanBuilder("business-logic").startSpan();
try (Scope scope = processingSpan.makeCurrent()) {
processMessage(msg.getValue());
processingSpan.setStatus(StatusCode.OK);
consumer.acknowledge(msg); // Consumer span ends with acknowledgment.type="acknowledge"
} catch (Exception e) {
processingSpan.recordException(e);
processingSpan.setStatus(StatusCode.ERROR); // Business logic failed
consumer.negativeAcknowledge(msg); // Consumer span ends with acknowledgment.type="negative_acknowledge"
throw e;
} finally {
processingSpan.end();
}
Querying by Acknowledgment Type
The messaging.pulsar.acknowledgment.type attribute allows you to filter and analyze spans:
Example queries in your tracing backend:
- Find all retried messages:
messaging.pulsar.acknowledgment.type = "negative_acknowledge" OR "ack_timeout" - Calculate retry rate:
count(negative_acknowledge) / count(acknowledge) - Identify timeout issues:
messaging.pulsar.acknowledgment.type = "ack_timeout" - Analyze cumulative vs individual acks: Group by
messaging.pulsar.acknowledgment.type
Best Practices
-
Always use interceptors: Add tracing interceptors to both producers and consumers for complete visibility.
-
Propagate context from HTTP: When publishing from HTTP endpoints, always extract and propagate the trace context.
-
Handle errors properly: Ensure spans are ended even when exceptions occur.
-
Distinguish messaging vs. application errors:
- Messaging operations (nack, timeout) end with OK status + events
- Application failures should be tracked in separate child spans with ERROR status
-
Use meaningful span names: The default span names include the topic name for easy identification.
-
Consider performance: Tracing adds minimal overhead, but in high-throughput scenarios, consider sampling.
-
Clean up resources: Ensure interceptors and OpenTelemetry SDK are properly closed when shutting down.
Troubleshooting
Traces not appearing
- Verify OpenTelemetry SDK is configured and exporters are set up
- Check that interceptors are added to producers/consumers
- Verify trace exporter endpoint is reachable
- Enable debug logging:
-Dio.opentelemetry.javaagent.debug=true
Missing parent-child relationships
- Ensure trace context is being injected via
TracingProducerBuilder.injectContext() - Verify message properties contain
traceparentheader - Check that both producer and consumer have tracing interceptors
High overhead
- Consider using sampling:
-Dotel.traces.sampler=parentbased_traceidratio -Dotel.traces.sampler.arg=0.1 - Use batch span processor (default)
- Adjust batch processor settings if needed
Examples
See the following files for complete examples:
TracingExampleTest.java- Comprehensive usage examplesOpenTelemetryTracingTest.java- Unit tests demonstrating API usage
API Reference
Main Classes
OpenTelemetryProducerInterceptor- Producer interceptor for tracingOpenTelemetryConsumerInterceptor- Consumer interceptor for tracingTracingContext- Utility methods for span creation and context propagationTracingProducerBuilder- Helper for injecting trace context into messages