概述
SparkStreaming
整合Kafka
有两种方式,一种是基于接收器的方法,另一种是直接方法(无接收器)。
Receiver
方式:由Spark executors
中的Receiver
来接收kafka
中的数据。
Direct
方式:此方法不使用接收器接收数据,而是周期性查询Kafka
中每个主题+分区中的最新偏移量,并相应地定义要在每批中处理的偏移量范围。处理数据的作业启动后,Kafka consumerAPI
读取Kafka
中定义的偏移量范围(类似于从文件系统读取文件)。
由于Direct
相比Receiver
有诸多优势:简化并行性、效率高等,因此我们选择Direct
方式。
代码实现
功能介绍:对Kafka队列进行侦听,然后需要统计接收的文本数据中的每个单词的出现频率。
1、添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.0</version>
</dependency>
- 注意版本问题,可能会出现
NoClassDefFoundError kafka/serializer/StringDecoder
问题
2、创建SparkStreamingKafkaTest
类
public class SparkStreamingKafkaTest {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, String> params = new HashMap<>();
params.put("metadata.broker.list", "192.168.10.30:9092,192.168.10.33:9092,192.168.10.32:9092");
Set<String> topics = new HashSet<>();
topics.add("topic_1");
JavaPairInputDStream<String, String> dStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, params, topics);
JavaDStream<String> words = dStream.flatMap(x -> Arrays.asList(x._2.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
- 跟之前SparkStreaming的步骤差不多
JavaPairInputDStream
取值时需要用x._2
才能取到数据
测试
1、IDEA中运行sparkstreaming程序
2、生产者实时生产数据
bin/kafka-console-producer.sh --broker-list 398.cdh.master:9092,398.cdh
.slave1:9092,398.cdh.slave2:9092 --topic topic_1
输入:
>hello world hello somebofy
3、可以看到IDEA的控制台中,spark实时处理了来自kafka的数据
···
(hello,2)
(somebofy,1)
(world,1)
···