Interface ClientBuilder
- All Superinterfaces:
Cloneable
,Serializable
PulsarClient
instance.- Since:
- 2.0.0
-
Method Summary
Modifier and TypeMethodDescriptionallowTlsInsecureConnection
(boolean allowTlsInsecureConnection) Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false).authentication
(String authPluginClassName, String authParamsString) Configure the authentication provider to use in the Pulsar client instance.authentication
(String authPluginClassName, Map<String, String> authParams) Configure the authentication provider to use in the Pulsar client instance using a config map.authentication
(Authentication authentication) Set the authentication provider to use in the Pulsar client instance.autoCertRefreshSeconds
(int autoCertRefreshSeconds) Set Cert Refresh interval in seconds.build()
Construct the finalPulsarClient
instance.The clock used by the pulsar client.clone()
Create a copy of the current client builder.connectionMaxIdleSeconds
(int connectionMaxIdleSeconds) Release the connection if it is not used for more than seconds.connectionsPerBroker
(int connectionsPerBroker) Sets the max number of connection that the client library will open to a single broker.connectionTimeout
(int duration, TimeUnit unit) Set the duration of time to wait for a connection to a broker to be established.dnsLookupBind
(String address, int port) Set dns lookup bind address and port.dnsServerAddresses
(List<InetSocketAddress> addresses) Set dns lookup server addresses.enableBusyWait
(boolean enableBusyWait) Option to enable busy-wait settings.enableTcpNoDelay
(boolean enableTcpNoDelay) Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.enableTls
(boolean enableTls) Deprecated.use "pulsar+ssl://" in serviceUrl to enableenableTlsHostnameVerification
(boolean enableTlsHostnameVerification) It allows to validate hostname verification when client connects to broker over tls.enableTransaction
(boolean enableTransaction) If enable transaction, start the transactionCoordinatorClient with pulsar client.ioThreads
(int numIoThreads) Set the number of threads to be used for handling connections to brokers (default: Runtime.getRuntime().availableProcessors()).keepAliveInterval
(int keepAliveInterval, TimeUnit unit) Set keep alive interval for each client-broker-connection.listenerName
(String name) Configure the listenerName that the broker will return the corresponding `advertisedListener`.listenerThreads
(int numListenerThreads) Set the number of threads to be used for message listeners (default: Runtime.getRuntime().availableProcessors()).Load the configuration from provided config map.lookupProperties
(Map<String, String> properties) Set the properties used for topic lookup.lookupTimeout
(int lookupTimeout, TimeUnit unit) Set lookup timeout (default: matches operation timeout)maxBackoffInterval
(long duration, TimeUnit unit) Set the maximum duration of time for a backoff interval.maxConcurrentLookupRequests
(int maxConcurrentLookupRequests) Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker.maxLookupRedirects
(int maxLookupRedirects) Set the maximum number of times a lookup-request to a broker will be redirected.maxLookupRequests
(int maxLookupRequests) Number of max lookup-requests allowed on each broker-connection to prevent overload on broker.maxNumberOfRejectedRequestPerConnection
(int maxNumberOfRejectedRequestPerConnection) Set max number of broker-rejected requests in a certain time-frame (60 seconds) after which current connection will be closed and client creates a new connection that give chance to connect a different broker (default: 50).memoryLimit
(long memoryLimit, SizeUnit unit) Configure a limit on the amount of direct memory that will be allocated by this client instance.openTelemetry
(io.opentelemetry.api.OpenTelemetry openTelemetry) Configure OpenTelemetry for Pulsar ClientoperationTimeout
(int operationTimeout, TimeUnit unit) Set the operation timeout (default: 30 seconds).proxyServiceUrl
(String proxyServiceUrl, ProxyProtocol proxyProtocol) Proxy-service url when client would like to connect to broker via proxy.serviceUrl
(String serviceUrl) Configure the service URL for the Pulsar service.serviceUrlProvider
(ServiceUrlProvider serviceUrlProvider) Configure the service URL provider for Pulsar service.socks5ProxyAddress
(InetSocketAddress socks5ProxyAddress) Set socks5 proxy address.socks5ProxyPassword
(String socks5ProxyPassword) Set socks5 proxy password.socks5ProxyUsername
(String socks5ProxyUsername) Set socks5 proxy username.sslFactoryPlugin
(String sslFactoryPlugin) Set the SSL Factory Plugin for custom implementation to create SSL Context and SSLEngine.sslFactoryPluginParams
(String sslFactoryPluginParams) Set the SSL Factory Plugin params for the ssl factory plugin to use.sslProvider
(String sslProvider) The name of the security provider used for SSL connections.startingBackoffInterval
(long duration, TimeUnit unit) Set the duration of time for a backoff interval.statsInterval
(long statsInterval, TimeUnit unit) Deprecated.tlsCertificateFilePath
(String tlsCertificateFilePath) Set the path to the TLS certificate file.tlsCiphers
(Set<String> tlsCiphers) A list of cipher suites.tlsKeyFilePath
(String tlsKeyFilePath) Set the path to the TLS key file.tlsKeyStorePassword
(String tlsKeyStorePassword) The store password for the key store file.tlsKeyStorePath
(String tlsTrustStorePath) The location of the key store file.tlsKeyStoreType
(String tlsKeyStoreType) The file format of the key store file.tlsProtocols
(Set<String> tlsProtocols) The SSL protocol used to generate the SSLContext.tlsTrustCertsFilePath
(String tlsTrustCertsFilePath) Set the path to the trusted TLS certificate file.tlsTrustStorePassword
(String tlsTrustStorePassword) The store password for the key store file.tlsTrustStorePath
(String tlsTrustStorePath) The location of the trust store file.tlsTrustStoreType
(String tlsTrustStoreType) The file format of the trust store file.useKeyStoreTls
(boolean useKeyStoreTls) If Tls is enabled, whether use KeyStore type as tls configuration parameter.
-
Method Details
-
build
Construct the finalPulsarClient
instance.- Returns:
- the new
PulsarClient
instance - Throws:
PulsarClientException
-
loadConf
Load the configuration from provided config map.Example:
Map<String, Object> config = new HashMap<>(); config.put("serviceUrl", "pulsar://localhost:6650"); config.put("numIoThreads", 20); ClientBuilder builder = ...; builder = builder.loadConf(config); PulsarClient client = builder.build();
- Parameters:
config
- configuration to load- Returns:
- the client builder instance
-
clone
ClientBuilder clone()Create a copy of the current client builder.Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For example:
ClientBuilder builder = PulsarClient.builder() .ioThreads(8) .listenerThreads(4); PulsarClient client1 = builder.clone() .serviceUrl("pulsar://localhost:6650").build(); PulsarClient client2 = builder.clone() .serviceUrl("pulsar://other-host:6650").build();
- Returns:
- a clone of the client builder instance
-
serviceUrl
Configure the service URL for the Pulsar service.This parameter is required.
Examples:
pulsar://my-broker:6650
for regular endpointpulsar+ssl://my-broker:6651
for TLS encrypted endpoint
- Parameters:
serviceUrl
- the URL of the Pulsar service that the client should connect to- Returns:
- the client builder instance
-
serviceUrlProvider
Configure the service URL provider for Pulsar service.Instead of specifying a static service URL string (with
serviceUrl(String)
), an application can pass aServiceUrlProvider
instance that dynamically provide a service URL.- Parameters:
serviceUrlProvider
- the provider instance- Returns:
- the client builder instance
-
listenerName
Configure the listenerName that the broker will return the corresponding `advertisedListener`.- Parameters:
name
- the listener name- Returns:
- the client builder instance
-
connectionMaxIdleSeconds
Release the connection if it is not used for more than seconds. Defaults to 25 seconds.- Returns:
- the client builder instance
-
authentication
Set the authentication provider to use in the Pulsar client instance.Example:
PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar+ssl://broker.example.com:6651/") .authentication( AuthenticationFactory.TLS("/my/cert/file", "/my/key/file") .build();
For token based authentication, this will look like:
AuthenticationFactory .token("eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY")
- Parameters:
authentication
- an instance of theAuthentication
provider already constructed- Returns:
- the client builder instance
-
authentication
ClientBuilder authentication(String authPluginClassName, String authParamsString) throws PulsarClientException.UnsupportedAuthenticationException Configure the authentication provider to use in the Pulsar client instance.Example:
PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar+ssl://broker.example.com:6651/) .authentication( "org.apache.pulsar.client.impl.auth.AuthenticationTls", "tlsCertFile:/my/cert/file,tlsKeyFile:/my/key/file") .build();
- Parameters:
authPluginClassName
- name of the Authentication-Plugin you want to useauthParamsString
- string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"- Returns:
- the client builder instance
- Throws:
PulsarClientException.UnsupportedAuthenticationException
- failed to instantiate specified Authentication-Plugin
-
authentication
ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams) throws PulsarClientException.UnsupportedAuthenticationExceptionConfigure the authentication provider to use in the Pulsar client instance using a config map.Example:
Map<String, String> conf = new TreeMap<>(); conf.put("tlsCertFile", "/my/cert/file"); conf.put("tlsKeyFile", "/my/key/file"); PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar+ssl://broker.example.com:6651/) .authentication( "org.apache.pulsar.client.impl.auth.AuthenticationTls", conf) .build();
- Parameters:
authPluginClassName
- name of the Authentication-Plugin you want to useauthParams
- map which represents parameters for the Authentication-Plugin- Returns:
- the client builder instance
- Throws:
PulsarClientException.UnsupportedAuthenticationException
- failed to instantiate specified Authentication-Plugin
-
operationTimeout
Set the operation timeout (default: 30 seconds).Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed
- Parameters:
operationTimeout
- operation timeoutunit
- time unit foroperationTimeout
- Returns:
- the client builder instance
-
lookupTimeout
Set lookup timeout (default: matches operation timeout)Lookup operations have a different load pattern to other operations. They can be handled by any broker, are not proportional to throughput, and are harmless to retry. Given this, it makes sense to allow them to retry longer than normal operation, especially if they experience a timeout.
By default, this is set to match operation timeout. This is to maintain legacy behaviour. However, in practice it should be set to 5-10x the operation timeout.
- Parameters:
lookupTimeout
- lookup timeoutunit
- time unit forlookupTimeout
- Returns:
- the client builder instance
-
ioThreads
Set the number of threads to be used for handling connections to brokers (default: Runtime.getRuntime().availableProcessors()).- Parameters:
numIoThreads
- the number of IO threads- Returns:
- the client builder instance
-
listenerThreads
Set the number of threads to be used for message listeners (default: Runtime.getRuntime().availableProcessors()).The listener thread pool is shared across all the consumers and readers that are using a "listener" model to get messages. For a given consumer, the listener will always be invoked from the same thread, to ensure ordering.
- Parameters:
numListenerThreads
- the number of listener threads- Returns:
- the client builder instance
-
connectionsPerBroker
Sets the max number of connection that the client library will open to a single broker.By default, the connection pool will use a single connection for all the producers and consumers. Increasing this parameter may improve throughput when using many producers over a high latency connection.
- Parameters:
connectionsPerBroker
- max number of connections per broker (needs to be greater than or equal to 0)- Returns:
- the client builder instance
-
enableTcpNoDelay
Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.No-delay features make sure packets are sent out on the network as soon as possible, and it's critical to achieve low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall throughput, so if latency is not a concern, it's advisable to set the
useTcpNoDelay
flag to false.Default value is true.
- Parameters:
enableTcpNoDelay
- whether to enable TCP no-delay feature- Returns:
- the client builder instance
-
enableTls
Deprecated.use "pulsar+ssl://" in serviceUrl to enableConfigure whether to use TLS encryption on the connection (default: true if serviceUrl starts with "pulsar+ssl://", false otherwise).- Parameters:
enableTls
-- Returns:
- the client builder instance
-
tlsKeyFilePath
Set the path to the TLS key file.- Parameters:
tlsKeyFilePath
-- Returns:
- the client builder instance
-
tlsCertificateFilePath
Set the path to the TLS certificate file.- Parameters:
tlsCertificateFilePath
-- Returns:
- the client builder instance
-
tlsTrustCertsFilePath
Set the path to the trusted TLS certificate file.- Parameters:
tlsTrustCertsFilePath
-- Returns:
- the client builder instance
-
allowTlsInsecureConnection
Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false).- Parameters:
allowTlsInsecureConnection
- whether to accept a untrusted TLS certificate- Returns:
- the client builder instance
-
enableTlsHostnameVerification
It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509 certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1. Server Identity hostname verification.- Parameters:
enableTlsHostnameVerification
- whether to enable TLS hostname verification- Returns:
- the client builder instance
- See Also:
-
useKeyStoreTls
If Tls is enabled, whether use KeyStore type as tls configuration parameter. False means use default pem type configuration.- Parameters:
useKeyStoreTls
-- Returns:
- the client builder instance
-
sslProvider
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.- Parameters:
sslProvider
-- Returns:
- the client builder instance
-
tlsKeyStoreType
The file format of the key store file.- Parameters:
tlsKeyStoreType
-- Returns:
- the client builder instance
-
tlsKeyStorePath
The location of the key store file.- Parameters:
tlsTrustStorePath
-- Returns:
- the client builder instance
-
tlsKeyStorePassword
The store password for the key store file.- Parameters:
tlsKeyStorePassword
-- Returns:
- the client builder instance
-
tlsTrustStoreType
The file format of the trust store file.- Parameters:
tlsTrustStoreType
-- Returns:
- the client builder instance
-
tlsTrustStorePath
The location of the trust store file.- Parameters:
tlsTrustStorePath
-- Returns:
- the client builder instance
-
tlsTrustStorePassword
The store password for the key store file.- Parameters:
tlsTrustStorePassword
-- Returns:
- the client builder instance
-
tlsCiphers
A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.- Parameters:
tlsCiphers
-- Returns:
- the client builder instance
-
tlsProtocols
The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.- Parameters:
tlsProtocols
-- Returns:
- the client builder instance
-
memoryLimit
Configure a limit on the amount of direct memory that will be allocated by this client instance.Note: at this moment this is only limiting the memory for producers.
Setting this to 0 will disable the limit.
- Parameters:
memoryLimit
- the limitunit
- the memory limit size unit- Returns:
- the client builder instance
-
statsInterval
Deprecated.Set the interval between each stat info (default: 60 seconds) Stats will be activated with positive statsInterval It should be set to at least 1 second.- Parameters:
statsInterval
- the interval between each stat infounit
- time unit forstatsInterval
- Returns:
- the client builder instance
-
maxConcurrentLookupRequests
Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker. (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe on thousands of topic using createdPulsarClient
.- Parameters:
maxConcurrentLookupRequests
-- Returns:
- the client builder instance
-
maxLookupRequests
Number of max lookup-requests allowed on each broker-connection to prevent overload on broker. (default: 50000) It should be bigger than maxConcurrentLookupRequests. Requests that inside maxConcurrentLookupRequests already send to broker, and requests beyond maxConcurrentLookupRequests and under maxLookupRequests will wait in each client cnx.- Parameters:
maxLookupRequests
-- Returns:
- the client builder instance
-
maxLookupRedirects
Set the maximum number of times a lookup-request to a broker will be redirected.- Parameters:
maxLookupRedirects
- the maximum number of redirects- Returns:
- the client builder instance
- Since:
- 2.6.0
-
maxNumberOfRejectedRequestPerConnection
Set max number of broker-rejected requests in a certain time-frame (60 seconds) after which current connection will be closed and client creates a new connection that give chance to connect a different broker (default: 50).- Parameters:
maxNumberOfRejectedRequestPerConnection
-- Returns:
- the client builder instance
-
keepAliveInterval
Set keep alive interval for each client-broker-connection. (default: 30 seconds).- Parameters:
keepAliveInterval
-unit
- the time unit in which the keepAliveInterval is defined- Returns:
- the client builder instance
-
connectionTimeout
Set the duration of time to wait for a connection to a broker to be established. If the duration passes without a response from the broker, the connection attempt is dropped.- Parameters:
duration
- the duration to waitunit
- the time unit in which the duration is defined- Returns:
- the client builder instance
- Since:
- 2.3.0
-
startingBackoffInterval
Set the duration of time for a backoff interval.- Parameters:
duration
- the duration of the intervalunit
- the time unit in which the duration is defined- Returns:
- the client builder instance
-
maxBackoffInterval
Set the maximum duration of time for a backoff interval.- Parameters:
duration
- the duration of the intervalunit
- the time unit in which the duration is defined- Returns:
- the client builder instance
-
enableBusyWait
Option to enable busy-wait settings. Default is false. WARNING: This option will enable spin-waiting on executors and IO threads in order to reduce latency during context switches. The spinning will consume 100% CPU even when the broker is not doing any work. It is recommended to reduce the number of IO threads and BK client threads to only have few CPU cores busy.- Parameters:
enableBusyWait
- whether to enable busy wait- Returns:
- the client builder instance
-
openTelemetry
Configure OpenTelemetry for Pulsar ClientWhen you pass an OpenTelemetry instance, Pulsar client will emit metrics that can be exported in a variety of different methods.
Refer to OpenTelemetry Java SDK documentation for how to configure OpenTelemetry and the metrics exporter.
By default, Pulsar client will use the
GlobalOpenTelemetry
instance. If an OpenTelemetry JVM agent is configured, the metrics will be reported, otherwise the metrics will be completely disabled.- Parameters:
openTelemetry
- the OpenTelemetry instance- Returns:
- the client builder instance
-
clock
The clock used by the pulsar client.The clock is currently used by producer for setting publish timestamps.
Clock.millis()
is called to retrieve current timestamp as the publish timestamp when producers produce messages. The default clock is a system default zone clock. So the publish timestamp is same as callingSystem.currentTimeMillis()
.Warning: the clock is used for TTL enforcement and timestamp based seeks. so be aware of the impacts if you are going to use a different clock.
- Parameters:
clock
- the clock used by the pulsar client to retrieve time information- Returns:
- the client builder instance
-
proxyServiceUrl
Proxy-service url when client would like to connect to broker via proxy. Client can choose type of proxy-routing usingProxyProtocol
.- Parameters:
proxyServiceUrl
- proxy service urlproxyProtocol
- protocol to decide type of proxy routing eg: SNI-routing- Returns:
- the client builder instance
-
enableTransaction
If enable transaction, start the transactionCoordinatorClient with pulsar client.- Parameters:
enableTransaction
- whether enable transaction feature- Returns:
- the client builder instance
-
dnsLookupBind
Set dns lookup bind address and port.- Parameters:
address
- dnsBindAddressport
- dnsBindPort- Returns:
- the client builder instance
-
dnsServerAddresses
Set dns lookup server addresses.- Parameters:
addresses
- dnsServerAddresses- Returns:
- the client builder instance
-
socks5ProxyAddress
Set socks5 proxy address.- Parameters:
socks5ProxyAddress
-- Returns:
- the client builder instance
-
socks5ProxyUsername
Set socks5 proxy username.- Parameters:
socks5ProxyUsername
-- Returns:
- the client builder instance
-
socks5ProxyPassword
Set socks5 proxy password.- Parameters:
socks5ProxyPassword
-- Returns:
- the client builder instance
-
sslFactoryPlugin
Set the SSL Factory Plugin for custom implementation to create SSL Context and SSLEngine.- Parameters:
sslFactoryPlugin
- ssl factory class name- Returns:
- the client builder instance
-
sslFactoryPluginParams
Set the SSL Factory Plugin params for the ssl factory plugin to use.- Parameters:
sslFactoryPluginParams
- Params in String format that will be inputted to the SSL Factory Plugin- Returns:
- the client builder instance
-
autoCertRefreshSeconds
Set Cert Refresh interval in seconds.- Parameters:
autoCertRefreshSeconds
-- Returns:
- the client builder instance
-
lookupProperties
Set the properties used for topic lookup.When the broker performs topic lookup, these lookup properties will be taken into consideration in a customized load manager.
Note: The lookup properties are only used in topic lookup when: - The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://" - The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface
-