Pulsar adaptor for Apache Spark
Pulsar 용 Spark Streaming 수신기는 Apache Spark Streaming이 Pulsar로부터 데이터를 수신 할 수 있도록하는 사용자 정의 수신기입니다.
애플리케이션은 Spark Streaming Pulsar 수신기를 통해 Resilient Distributed Dataset (Resilient Distributed Dataset) 형식의 데이터를 수신하고 다양한 방식으로 처리 할 수 있습니다.
전제 조건
수신자를 사용하려면 Java 구성에서 pulsar-spark 라이브러리
에 대한 디펜던시을 포함하십시오.
Maven
Maven을 사용하는 경우, 이 pom.xml
을 추가하십시오.
<!-- in your <properties> block -->
<pulsar.version>2.7.1</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>${pulsar.version}</version>
</dependency>
Gradle
Gradle을 사용하는 경우, 이 build.gradle
파일을 추가 하십시오.
def pulsarVersion = "2.7.1"
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
}
Usage
SparkStreamingPulsarReceiver
의 인스턴스를 JavaStreamingContext
의 receiverStream
메소드로 전달하십시오.
String serviceUrl = "pulsar://localhost:6650/";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();
Set<String> set = new HashSet<>();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationDisabled());
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);
For a complete example, click here. In this example, the number of messages that contain the string "Pulsar" in received messages is counted.