Pulsar adaptor for Apache Storm
Pulsar Storm is an adaptor for integrating with Apache Storm topologies. It provides core Storm implementations for sending and receiving data.
Une application peut injecter des données dans une topologie Storm via un spout Pulsar générique, ainsi que consommer des données provenant d'une topologie Storm via un bolt Pulsar générique.
Utilisation de l'adapteur Pulsar pour Apache Storm
Inclusion de dépendance pour l'adaptateur Pulsar Storm:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-storm</artifactId>
<version>${pulsar.version}</version>
</dependency>
Spout Pulsar
Le spout Pulsar permet aux données publiées sur un sujet d'être utilisées par une topologie Storm. Il émet un tuple Storm basé sur le message reçu et le MessageToValuesMapper
fourni par le client.
Les tuples qui ne parviennent pas à être traités par les bolts en aval seront réinjectés par le spout avec une temporisation exponentielle, dans un délai configurable (le délai par défaut est de 60 secondes) ou un nombre configurable d'essais, selon ce qui survient en premier, après quoi il sont acquités par le consommateur. Voici un exemple de déclaration d'un spout:
MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
@Override
public Values toValues(Message msg) {
return new Values(new String(msg.getData()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declare the output fields
declarer.declare(new Fields("string"));
}
};
// Configuration d'un Spout Pulsar
PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
spoutConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
spoutConf.setTopic("persistent://my-property/usw/my-ns/my-topic1");
spoutConf.setSubscriptionName("my-subscriber-name1");
spoutConf.setMessageToValuesMapper(messageToValuesMapper);
// Création d'un Spout Pulsar
PulsarSpout spout = new PulsarSpout(spoutConf);
For a complete example, click here.
Bolt Pulsar
The Pulsar bolt allows data in a Storm topology to be published on a topic. It publishes messages based on the Storm tuple received and the TupleToMessageMapper
provided by the client.
Un topic partitionné peut également être utilisé pour publier des messages sur différents topics. Dan l'implémentation du TupleToMessageMapper
, une "clé" devra être fournie dans le message qui enverra les messages avec la même clé au même topic. Voici un exemple de bolt:
TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
@Override
public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
String receivedMessage = tuple.getString(0);
// traitement du message
String processedMsg = receivedMessage + "-processed";
return msgBuilder.value(processedMsg.getBytes());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// déclaration des champs en sortie
}
};
// Configuration d'un Bolt Pulsar
PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
boltConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
boltConf.setTopic("persistent://my-property/usw/my-ns/my-topic2");
boltConf.setTupleToMessageMapper(tupleToMessageMapper);
// Création d'un Bolt Pulsar
PulsarBolt bolt = new PulsarBolt(boltConf);