MapReduce实战

Posted by Kaka Blog on December 24, 2019

前言

通过案例,了解大数据处理中分析统计的流程,以及所用到的一些技术。

案例需求分析

案例:直播平台开播数据统计分析

数据文件:

···
{"id":"158008300435","uid":"120010010445","nickname":"jack435","gold":445,"watchnumpy":4350,"watchnumuv":870,"hots":1305,"nofollower":435,"looktime":8700,"smlook":2175,"followe":1740,"gifter":870,"length":2610,"ares":"A_US","rating":"B","exp":1305,"type":"video_rating"}
···

数据清洗

需求

  1. 从原始数据中过滤需要的字段:主播ID(uid)、金币数量(gold)、总观看PV(watchnumpv)、粉丝关注数量(follower)、视频总开播时长(length)
  2. 对这些字段进行异常值判断,如果数据异常,则直接丢弃

分析

  1. 由于原始数据是json格式,所以可以使用fastjson进行解析
  2. 由于不需要聚合过程,所以只需要map阶段即可,reduce阶段就不需要
  3. map阶段的k1, v1的数据类型是固定的:<LongWritable, Text> k2, v2的数据类型为<Text, Text>,k2存储主播ID,v2存储核心字段,多个字段中间用\t分割

代码实现

pom文件:

<dependencies>
    ···
    <!-- hadoop-client依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.2.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.54</version>
    </dependency>
    ···
</dependencies>
<build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- 将依赖的jar包也打进去 -->
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>2.5.5</version>
          <configuration>
            <!-- get all project dependencies -->
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <!-- MainClass in mainfest make a executable jar -->
            <archive>
              <manifest>
                <mainClass></mainClass>
              </manifest>
            </archive>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <!-- bind to the packaging phase -->
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

1、编写Map类

public class DataCleanMap extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        JSONObject jsonObject = JSON.parseObject(line);
        String id = jsonObject.getString("uid");
        int gold = jsonObject.getIntValue("gold");
        int watchnumpv = jsonObject.getIntValue("watchnumpv");
        int follower = jsonObject.getIntValue("follower");
        int length = jsonObject.getIntValue("length");
        if (gold >= 0 && watchnumpv >= 0 && follower >= 0 && length >= 0) {
            // 组装k2, v2
            Text k2 = new Text(id);
            Text v2 = new Text(gold + "\t" + watchnumpv + "\t" + follower + "\t" + length);
            context.write(k2, v2);
        }
    }
}

2、编写任务类

public class DataCleanJob {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        if (args.length != 2) {
            System.exit(100);
        }
        Configuration conf = new Configuration();
        // 创建一个Job
        Job job = Job.getInstance(conf, "DataClean");
        // 设置Job运行的类
        job.setJarByClass(DataCleanJob.class);
        // 设置Job的Mapper类
        job.setMapperClass(DataCleanMap.class);
        // 设置Job的Reduce类
        //job.setReducerClass(WordCountReducer.class);
        // 设置文本输入类型
        job.setInputFormatClass(TextInputFormat.class);
        // 设置文本输出类型
        job.setOutputFormatClass(TextOutputFormat.class);
        // 设置文件输入路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 设置文件输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 设置输出文件key, value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 警用reduce
        job.setNumReduceTasks(0);
        // 启动任务
        job.waitForCompletion(true);
    }
}

3、打包

mvn clean assembly:assembly -DskipTests

打包后会生成两个jar包,一个包含第三方的jar,另一个是不含的。

待解决:

正常情况下通过mvn clean package也可以打包生成MapReduceDemo-1.0-SNAPSHOT-jar-with-dependencies.jar,不知道为什么不行。

4、上传jar包和数据文件到服务器

hdfs dfs -mkdir -p /data/videoinfo/20191225
hdfs dfs -put /home/bigdata/video_rating.log /data/videoinfo/20191225

5、运行程序

hadoop jar MapReduceDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.fang.dataClean.DataCleanJob /data/videoinfo/20191225 /result20191225

注意:运行可能会出现java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON 解决:需要将依赖的jar包也打进去,见pom.xml文件

编写定时任务

1、新建Shell脚本

#!/bin/bash

# 判断用户是否输入日期
if [ "X$1" = "X" ]
then
    yes_time=`date +%Y%m%d --date="1 days ago"`
else
    yes_time=$1
fi

cleanjob_input=/data/videoinfo/${yes_time}
cleanjob_output=/result/${yes_time}
jobs_home=/home/bigdata

# 删除输出目录
hdfs dfs -rm -r ${cleanjob_output}

# 执行清洗任务
hadoop jar ${jobs_home}/MapReduceDemo-1.0-SNAPSHOT-jar-with-dependencies.jar \
com.fang.dataClean.DataCleanJob ${cleanjob_input} ${cleanjob_output}

# 判断是否执行成功
hdfs dfs -ls ${cleanjob_output}/_SUCCESS
if [ "$?" = "0" ]
then
    echo "clean job execute success"
else
    echo "clean job execute failed"
fi

2、添加到crontab

30 00 * * * hdfs /bin/sh /home/bigdata/dataClean.sh » /data/jobs/logs/

结果数据导到MySQL

1、添加mysql驱动mysql-connector-java-5.1.47.jar包到/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/sqoop/lib目录下

2、创建数据库和对应的表

3、使用Sqoop导入数据到MySQL

sqoop export --connect jdbc:mysql://192.168.10.*:3306/test1?serverTimezone=UTC --username root --password *** --table top10 --export-dir /result/20191225 --input-fields-terminated-by "\t"

出现错误:

Export job failed!
	at org.apache.sqoop.mapreduce.ExportJobBase.runExport(ExportJobBase.java:439)
	at org.apache.sqoop.manager.SqlManager.exportTable(SqlManager.java:930)
	at org.apache.sqoop.tool.ExportTool.exportTable(ExportTool.java:92)
	at org.apache.sqoop.tool.ExportTool.run(ExportTool.java:111)
	at org.apache.sqoop.Sqoop.run(Sqoop.java:147)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)
	at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)
	at org.apache.sqoop.Sqoop.runTool(Sqoop.java:243)
	at org.apache.sqoop.Sqoop.main(Sqoop.java:252)

这个问题不是具体的问题,但是想要知道具体的错误信息,在控制台是看不到的,只能到CDH的web管理界面去看。

Sqoop最终会转化为MR进行任务的执行,所以这里要看Sqoop的任务执行情况,还是要到YARN的详情界面去看。

img

img

img

img

2019-12-26 16:55:43,326 FATAL [IPC Server handler 0 on 44674] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1573534660588_0007_m_000000_0 - exited : top10 : Unsupported major.minor version 52.0

百度后发现CDH使用的JDK是1.7,而系统安装的jdk是1.8,到Cloudera Manager的管理页面修改各台主机的java目录,然后重启集群即可。再重新使用Sqoop导入。成功的输出如下:

img

查看数据库是否有数据。

参考资料