Skip to main content

Use filesystem offloader with Pulsar

This chapter guides you through every step of installing and configuring the filesystem offloader and using it with Pulsar.

Installation​

This section describes how to install the filesystem offloader.

Prerequisite​

  • Pulsar: 2.4.2 or higher versions

Step​

This example uses Pulsar 2.5.1.

  1. Download the Pulsar tarball using one of the following ways:

    • Download the Pulsar tarball from the Apache mirror

    • Download the Pulsar tarball from the Pulsar download page

    • Use the wget command to dowload the Pulsar tarball.


    wget https://archive.apache.org/dist/pulsar/pulsar-2.5.1/apache-pulsar-2.5.1-bin.tar.gz

  2. Download and untar the Pulsar offloaders package.


    wget https://downloads.apache.org/pulsar/pulsar-2.5.1/apache-pulsar-offloaders-2.5.1-bin.tar.gz

    tar xvfz apache-pulsar-offloaders-2.5.1-bin.tar.gz

    note
    • If you run Pulsar in a bare metal cluster, ensure that the offloaders tarball is unzipped in every broker's Pulsar directory.
    • If you run Pulsar in Docker or deploying Pulsar using a Docker image (such as K8S and DCOS), you can use the apachepulsar/pulsar-all image. The apachepulsar/pulsar-all image has already bundled tiered storage offloaders.
  3. Copy the Pulsar offloaders as offloaders in the Pulsar directory.


    mv apache-pulsar-offloaders-2.5.1/offloaders apache-pulsar-2.5.1/offloaders

    ls offloaders

    Output


    tiered-storage-file-system-2.5.1.nar
    tiered-storage-jcloud-2.5.1.nar

    note
    • If you run Pulsar in a bare metal cluster, ensure that offloaders tarball is unzipped in every broker's Pulsar directory.
    • If you run Pulsar in Docker or deploying Pulsar using a Docker image (such as K8s and DCOS), you can use the apachepulsar/pulsar-all image. The apachepulsar/pulsar-all image has already bundled tiered storage offloaders.

Configuration​

note

Before offloading data from BookKeeper to filesystem, you need to configure some properties of the filesystem offloader driver.

Besides, you can also configure the filesystem offloader to run it automatically or trigger it manually.

Configure filesystem offloader driver​

You can configure the filesystem offloader driver in the broker.conf or standalone.conf configuration file.

  • Required configurations are as below.

    ParameterDescriptionExample value
    managedLedgerOffloadDriverOffloader driver name, which is case-insensitive.filesystem
    fileSystemURIConnection address, which is the URI to access the default Hadoop distributed file system.hdfs://127.0.0.1:9000
    offloadersDirectoryHadoop profile path. The configuration file is stored in the Hadoop profile path. It contains various settings for Hadoop performance tuning.conf/filesystem_offload_core_site.xml
  • Optional configurations are as below.

    ParameterDescriptionExample value
    managedLedgerMinLedgerRolloverTimeMinutesMinimum time between ledger rollover for a topic.

    Note: it is not recommended to set this parameter in the production environment.
    2
    managedLedgerMaxEntriesPerLedgerMaximum number of entries to append to a ledger before triggering a rollover.

    Note: it is not recommended to set this parameter in the production environment.
    5000

Run filesystem offloader automatically​

You can configure the namespace policy to offload data automatically once a threshold is reached. The threshold is based on the size of data that a topic has stored on a Pulsar cluster. Once the topic storage reaches the threshold, an offload operation is triggered automatically.

Threshold valueAction
> 0It triggers the offloading operation if the topic storage reaches its threshold.
= 0It causes a broker to offload data as soon as possible.
< 0It disables automatic offloading operation.

Automatic offload runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, the filesystem offloader does not work until the current segment is full.

You can configure the threshold using CLI tools, such as pulsar-admin.

Example​

This example sets the filesystem offloader threshold to 10 MB using pulsar-admin.


pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-namespace

tip

For more information about the pulsar-admin namespaces set-offload-threshold options command, including flags, descriptions, default values, and shorthands, see here.

Run filesystem offloader manually​

For individual topics, you can trigger the filesystem offloader manually using one of the following methods:

  • Use the REST endpoint.

  • Use CLI tools (such as pulsar-admin).

To manually trigger the filesystem offloader via CLI tools, you need to specify the maximum amount of data (threshold) that should be retained on a Pulsar cluster for a topic. If the size of the topic data on the Pulsar cluster exceeds this threshold, segments from the topic are offloaded to the filesystem until the threshold is no longer exceeded. Older segments are offloaded first.

Example​

  • This example manually run the filesystem offloader using pulsar-admin.


    pulsar-admin topics offload --size-threshold 10M persistent://my-tenant/my-namespace/topic1

    Output


    Offload triggered for persistent://my-tenant/my-namespace/topic1 for messages before 2:0:-1

    tip

    For more information about the pulsar-admin topics offload options command, including flags, descriptions, default values, and shorthands, see here.

  • This example checks filesystem offloader status using pulsar-admin.


    pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1

    Output


    Offload is currently running

    To wait for the filesystem to complete the job, add the -w flag.


    pulsar-admin topics offload-status -w persistent://my-tenant/my-namespace/topic1

    Output


    Offload was a success

    If there is an error in the offloading operation, the error is propagated to the pulsar-admin topics offload-status command.


    pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1

    Output


    Error in offload
    null

    Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: com.amazonaws.services.s3.model.AmazonS3Exception: Anonymous users cannot initiate multipart uploads. Please authenticate. (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 798758DE3F1776DF; S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=), S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=

    tip

    For more information about the pulsar-admin topics offload-status options command, including flags, descriptions, default values, and shorthands, see here.

Tutorial​

This section provides step-by-step instructions on how to use the filesystem offloader to move data from Pulsar to Hadoop Distributed File System (HDFS) or Network File system (NFS).

To move data from Pulsar to HDFS, follow these steps.

Step 1: Prepare the HDFS environment​

This tutorial sets up a Hadoop single node cluster and uses Hadoop 3.2.1.

tip

For details about how to set up a Hadoop single node cluster, see here.

  1. Download and uncompress Hadoop 3.2.1.


    wget https://mirrors.bfsu.edu.cn/apache/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz

    tar -zxvf hadoop-3.2.1.tar.gz -C $HADOOP_HOME

  2. Configure Hadoop.


    # $HADOOP_HOME/etc/hadoop/core-site.xml
    <configuration>
    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
    </property>
    </configuration>

    # $HADOOP_HOME/etc/hadoop/hdfs-site.xml
    <configuration>
    <property>
    <name>dfs.replication</name>
    <value>1</value>
    </property>
    </configuration>

  3. Set passphraseless ssh.


    # Now check that you can ssh to the localhost without a passphrase:
    $ ssh localhost
    # If you cannot ssh to localhost without a passphrase, execute the following commands
    $ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
    $ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
    $ chmod 0600 ~/.ssh/authorized_keys

  4. Start HDFS.


    # don't execute this command repeatedly, repeat execute will cauld the clusterId of the datanode is not consistent with namenode
    $HADOOP_HOME/bin/hadoop namenode -format
    $HADOOP_HOME/sbin/start-dfs.sh

  5. Navigate to the HDFS website.

    You can see the Overview page.

    1. At the top navigation bar, click Datanodes to check DataNode information.

    2. Click HTTP Address to get more detailed information about localhost:9866.

      As can be seen below, the size of Capacity Used is 4 KB, which is the initial value.

Step 2: Install the filesystem offloader​

For details, see installation.

Step 3: Configure the filesystem offloader​

As indicated in the configuration section, you need to configure some properties for the filesystem offloader driver before using it. This tutorial assumes that you have configured the filesystem offloader driver as below and run Pulsar in standalone mode.

Set the following configurations in the conf/standalone.conf file.


managedLedgerOffloadDriver=filesystem
fileSystemURI=hdfs://127.0.0.1:9000
fileSystemProfilePath=conf/filesystem_offload_core_site.xml

note

For testing purposes, you can set the following two configurations to speed up ledger rollover, but it is not recommended that you set them in the production environment.


managedLedgerMinLedgerRolloverTimeMinutes=1
managedLedgerMaxEntriesPerLedger=100

Step 4: Offload data from BookKeeper to filesystem​

Execute the following commands in the repository where you download Pulsar tarball. For example, ~/path/to/apache-pulsar-2.5.1.

  1. Start Pulsar standalone.


    ./bin/pulsar standalone -a 127.0.0.1

  2. To ensure the data generated is not deleted immediately, it is recommended to set the retention policy, which can be either a size limit or a time limit. The larger value you set for the retention policy, the longer the data can be retained.


    ./bin/pulsarctl namespaces set-retention public/default --size 100M --time 2d

    tip

    For more information about the pulsarctl namespaces set-retention options command, including flags, descriptions, default values, and shorthands, see here.

  3. Produce data using pulsar-client.


    ./bin/pulsar-client produce -m "Hello FileSystem Offloader" -n 1000 public/default/fs-test

  4. The offloading operation starts after a ledger rollover is triggered. To ensure offload data successfully, it is recommended that you wait until several ledger rollovers are triggered. In this case, you might need to wait for a second. You can check the ledger status using pulsarctl.


    ./bin/pulsarctl topics internal-stats public/default/fs-test

    Output

    The data of the ledger 696 is not offloaded.


    {
    "version": 1,
    "creationDate": "2020-06-16T21:46:25.807+08:00",
    "modificationDate": "2020-06-16T21:46:25.821+08:00",
    "ledgers": [
    {
    "ledgerId": 696,
    "isOffloaded": false
    }
    ],
    "cursors": {}
    }

  5. Wait a second and send more messages to the topic.


    ./bin/pulsar-client produce -m "Hello FileSystem Offloader" -n 1000 public/default/fs-test

  6. Check the ledger status using pulsarctl.


    ./bin/pulsarctl topics internal-stats public/default/fs-test

    Output

    The ledger 696 is rolled over.


    {
    "version": 2,
    "creationDate": "2020-06-16T21:46:25.807+08:00",
    "modificationDate": "2020-06-16T21:48:52.288+08:00",
    "ledgers": [
    {
    "ledgerId": 696,
    "entries": 1001,
    "size": 81695,
    "isOffloaded": false
    },
    {
    "ledgerId": 697,
    "isOffloaded": false
    }
    ],
    "cursors": {}
    }

  7. Trigger the offloading operation manually using pulsarctl.


    ./bin/pulsarctl topic offload -s 0 public/default/fs-test

    Output

    Data in ledgers before the ledge 697 is offloaded.


    # offload info, the ledgers before 697 will be offloaded
    Offload triggered for persistent://public/default/fs-test3 for messages before 697:0:-1

  8. Check the ledger status using pulsarctl.


./bin/pulsarctl topic internal-info public/default/fs-test

Output

The data of the ledger 696 is offloaded.


{
"version": 4,
"creationDate": "2020-06-16T21:46:25.807+08:00",
"modificationDate": "2020-06-16T21:52:13.25+08:00",
"ledgers": [
{
"ledgerId": 696,
"entries": 1001,
"size": 81695,
"isOffloaded": true
},
{
"ledgerId": 697,
"isOffloaded": false
}
],
"cursors": {}
}

And the Capacity Used is changed from 4 KB to 116.46 KB.