Kafka整合Spark Streaming

Posted by Kaka Blog on April 2, 2019

概述

SparkStreaming整合Kafka有两种方式,一种是基于接收器的方法,另一种是直接方法(无接收器)。

Receiver方式:由Spark executors中的Receiver来接收kafka中的数据。 Direct方式:此方法不使用接收器接收数据,而是周期性查询Kafka中每个主题+分区中的最新偏移量,并相应地定义要在每批中处理的偏移量范围。处理数据的作业启动后,Kafka consumerAPI读取Kafka中定义的偏移量范围(类似于从文件系统读取文件)。

由于Direct相比Receiver有诸多优势:简化并行性、效率高等,因此我们选择Direct方式。

代码实现

功能介绍:对Kafka队列进行侦听,然后需要统计接收的文本数据中的每个单词的出现频率。

1、添加依赖

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
  • 注意版本问题,可能会出现NoClassDefFoundError kafka/serializer/StringDecoder问题

2、创建SparkStreamingKafkaTest

public class SparkStreamingKafkaTest {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
        Map<String, String> params = new HashMap<>();
        params.put("metadata.broker.list", "192.168.10.30:9092,192.168.10.33:9092,192.168.10.32:9092");
        Set<String> topics = new HashSet<>();
        topics.add("topic_1");
        JavaPairInputDStream<String, String> dStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
                StringDecoder.class, params, topics);
        JavaDStream<String> words = dStream.flatMap(x -> Arrays.asList(x._2.split(" ")).iterator());
        JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }
}
  • 跟之前SparkStreaming的步骤差不多
  • JavaPairInputDStream取值时需要用x._2才能取到数据

测试

1、IDEA中运行sparkstreaming程序 2、生产者实时生产数据 bin/kafka-console-producer.sh --broker-list 398.cdh.master:9092,398.cdh .slave1:9092,398.cdh.slave2:9092 --topic topic_1 输入:

>hello world hello somebofy

3、可以看到IDEA的控制台中,spark实时处理了来自kafka的数据

···
(hello,2)
(somebofy,1)
(world,1)
···

参考