Skip to main content

Pulsar plugin development

You can develop various plugins for Pulsar, such as entry filters, protocol handlers, interceptors, and so on.

Additional Servlets​

This chapter describes what additional servlets are and how to use them.

What is an additional servlet?​

Pulsar offers a multitude of REST APIs to interact with it. To expose additional custom logic as a REST API, Pulsar offers the concept of additional servlets. These servlets run as plugins in either the broker or the pulsar proxy.

How to use an additional servlet?​

Take a look at this example implementation, or follow the steps below:

  1. Create a Maven project.

  2. Implement the AdditionalServlet or AdditionalServletWithPulsarService interface. Use AdditionalServletWithPulsarService, if you need access to Pulsar internals for performing administrative tasks or producing messages.

  3. Package your project into a NAR file.

  4. Configure the broker.conf file (or the standalone.conf file) and restart your broker.

Step 1: Create a Maven project​

For how to create a Maven project, see here.

Step 2: Implement the AdditionalServlet interface​

  1. Add a dependency for pulsar-broker in the pom.xml file as displayed. Otherwise, you can not find the AdditionalServlet interface.

    <dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-broker</artifactId>
    <version>${pulsar.version}</version>
    <scope>provided</scope>
    </dependency>
  2. Implement the methods of the AdditionalServlet interface.

    • loadConfig allows you to configure your servlet by loading configuration properties from the PulsarConfiguration.

    • getBasePath defines the path your servlet will be loaded under.

    • getServletHolder returns the ServletHolder for this servlet.

    • close allows you to free up resources.

  3. Describe a NAR file.

    Create an additional_servlet.yml file in the resources/META-INF/services directory to describe a NAR file.

    name: my-servlet
    description: Describes my-servlet
    additionalServletClass: org.my.package.MyServlet

Step 3: package your project into a NAR file​

  1. Add the compiled plugin of the NAR file to your pom.xml file.

    <build>
    <finalName>${project.artifactId}</finalName>
    <plugins>
    <plugin>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-nar-maven-plugin</artifactId>
    <version>1.5.0</version>
    <extensions>true</extensions>
    <configuration>
    <finalName>${project.artifactId}-${project.version}</finalName>
    </configuration>
    <executions>
    <execution>
    <id>default-nar</id>
    <phase>package</phase>
    <goals>
    <goal>nar</goal>
    </goals>
    </execution>
    </executions>
    </plugin>
    </plugins>
    </build>
  2. Generate a NAR file in the target directory.

    mvn clean install

Step 4: configure and restart broker​

  1. Configure the following parameters in the broker.conf file (or the standalone.conf file).

    # Name of pluggable additional servlets
    # Multiple servlets need to be separated by commas.
    additionalServlets=my-servlet
    # The directory for all additional servlet implementations
    additionalServletDirectory=tempDir
  2. Restart your broker.

    You can see the following broker log if the plug-in is successfully loaded.

    Successfully loaded additional servlet for name `my-servlet`

Entry filter​

This chapter describes what the entry filter is and shows how to use the entry filter.

What is an entry filter?​

The entry filter is an extension point for implementing a custom message entry strategy. With an entry filter, you can decide whether to send messages to consumers (brokers can use the return values of entry filters to determine whether the messages need to be sent or discarded) or send messages to specific consumers.

To implement features such as tagged messages or custom delayed messages, use subscriptionProperties, ​​properties, and entry filters.

How to use an entry filter?​

Follow the steps below:

  1. Create a Maven project.

  2. Implement the EntryFilter interface.

  3. Package the implementation class into a NAR file.

  4. Configure the broker.conf file (or the standalone.conf file) and restart your broker.

Step 1: Create a Maven project​

For how to create a Maven project, see here.

Step 2: Implement the EntryFilter interface​

  1. Add a dependency for Pulsar broker in the pom.xml file to display. Otherwise, you can not find the EntryFilter interface.


    <dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-broker</artifactId>
    <version>${pulsar.version}</version>
    <scope>provided</scope>
    </dependency>

  2. Implement the FilterResult filterEntry(Entry entry, FilterContext context); method.

    • If the method returns ACCEPT or NULL, this message is sent to consumers.

    • If the method returns REJECT, this message is filtered out and it does not consume message permits.

    • If there are multiple entry filters, this message passes through all filters in the pipeline in a round-robin manner. If any entry filter returns REJECT, this message is discarded.

    You can get entry metadata, subscriptions, and other information through FilterContext.

  3. Describe a NAR file.

    Create an entry_filter.yml file in the resources/META-INF/services directory to describe a NAR file.


    # Entry filter name, which should be configured in the broker.conf file later
    name: entryFilter
    # Entry filter description
    description: entry filter
    # Implementation class name of entry filter
    entryFilterClass: com.xxxx.xxxx.xxxx.DefaultEntryFilterImpl

Step 3: package implementation class of entry filter into a NAR file​

  1. Add the compiled plugin of the NAR file to your pom.xml file.


    <build>
    <finalName>${project.artifactId}</finalName>
    <plugins>
    <plugin>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-nar-maven-plugin</artifactId>
    <version>1.5.0</version>
    <extensions>true</extensions>
    <configuration>
    <finalName>${project.artifactId}-${project.version}</finalName>
    </configuration>
    <executions>
    <execution>
    <id>default-nar</id>
    <phase>package</phase>
    <goals>
    <goal>nar</goal>
    </goals>
    </execution>
    </executions>
    </plugin>
    </plugins>
    </build>

  2. Generate a NAR file in the target directory.


    mvn clean install

Step 4: configure and restart broker​

  1. Configure the following parameters in the broker.conf file (or the standalone.conf file).


    # Class name of pluggable entry filters
    # Multiple classes need to be separated by commas.
    entryFilterNames=entryFilter1,entryFilter2,entryFilter3
    # The directory for all entry filter implementations
    entryFiltersDirectory=tempDir

  2. Restart your broker.

    You can see the following broker log if the plug-in is successfully loaded.


    Successfully loaded entry filter for name `{name of your entry filter}`