Skip to main content

Extending Authentication and Authorization in Pulsar

Pulsar provides a way to use custom authentication and authorization mechanisms.

Authentication​

Pulsar supports mutual TLS and Athenz authentication plugins. For how to use these authentication plugins, you can refer to the description in Security.

You can choose to use a custom authentication mechanism by providing the implementation in the form of two plugins. One plugin is for the Client library and the other plugin is for the Pulsar Broker to validate the credentials.

Client authentication plugin​

For client library, you need to implement org.apache.pulsar.client.api.Authentication. By entering the command below you can pass this class when you create a Pulsar client:


PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.authentication(new MyAuthentication())
.build();

You can use 2 interfaces to implement on the client side:

This in turn needs to provide the client credentials in the form of org.apache.pulsar.client.api.AuthenticationDataProvider. This leaves the chance to return different kinds of authentication token for different types of connection or by passing a certificate chain to use for TLS.

You can find examples for client authentication providers at:

Broker authentication plugin​

On broker side, you need the corresponding plugin to validate the credentials that the client passes. Broker can support multiple authentication providers at the same time.

In conf/broker.conf you can choose to specify a list of valid providers:


# Authentication provider name list, which is comma separated list of class names
authenticationProviders=

To implement org.apache.pulsar.broker.authentication.AuthenticationProvider on one single interface:


/**
* Provider of authentication mechanism
*/
public interface AuthenticationProvider extends Closeable {

/**
* Perform initialization for the authentication provider
*
* @param config
* broker config object
* @throws IOException
* if the initialization fails
*/
void initialize(ServiceConfiguration config) throws IOException;

/**
* @return the authentication method name supported by this provider
*/
String getAuthMethodName();

/**
* Validate the authentication for the given credentials with the specified authentication data
*
* @param authData
* provider specific authentication data
* @return the "role" string for the authenticated connection, if the authentication was successful
* @throws AuthenticationException
* if the credentials are not valid
*/
String authenticate(AuthenticationDataSource authData) throws AuthenticationException;

}

The following is the example for Broker authentication plugins:

Authorization​

Authorization is the operation that checks whether a particular "role" or "principal" has a permission to perform a certain operation.

By default, Pulsar provides an embedded authorization, though configuring a different one through a plugin is also an alternative choice.

To provide a custom provider, you need to implement the org.apache.pulsar.broker.authorization.AuthorizationProvider interface, put this class in the Pulsar broker classpath and configure the class in conf/broker.conf:


# Authorization provider fully qualified class-name
authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider


/**
* Provider of authorization mechanism
*/
public interface AuthorizationProvider extends Closeable {

/**
* Perform initialization for the authorization provider
*
* @param config
* broker config object
* @param configCache
* pulsar zk configuration cache service
* @throws IOException
* if the initialization fails
*/
void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;

/**
* Check if the specified role has permission to send messages to the specified fully qualified topic name.
*
* @param topicName
* the fully qualified topic name associated with the topic.
* @param role
* the app id used to send messages to the topic.
*/
CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData);

/**
* Check if the specified role has permission to receive messages from the specified fully qualified topic name.
*
* @param topicName
* the fully qualified topic name associated with the topic.
* @param role
* the app id used to receive messages from the topic.
* @param subscription
* the subscription name defined by the client
*/
CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData, String subscription);

/**
* Check whether the specified role can perform a lookup for the specified topic.
*
* For that the caller needs to have producer or consumer permission.
*
* @param topicName
* @param role
* @return
* @throws Exception
*/
CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData);

/**
*
* Grant authorization-action permission on a namespace to the given client
*
* @param namespace
* @param actions
* @param role
* @param authDataJson
* additional authdata in json format
* @return CompletableFuture
* @completesWith <br />
* IllegalArgumentException when namespace not found<br />
* IllegalStateException when failed to grant permission
*/
CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
String authDataJson);

/**
* Grant authorization-action permission on a topic to the given client
*
* @param topicName
* @param role
* @param authDataJson
* additional authdata in json format
* @return CompletableFuture
* @completesWith <br />
* IllegalArgumentException when namespace not found<br />
* IllegalStateException when failed to grant permission
*/
CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson);

}