前言
通过案例,了解大数据处理中分析统计的流程,以及所用到的一些技术。
案例需求分析
案例:直播平台开播数据统计分析
数据文件:
···
{"id":"158008300435","uid":"120010010445","nickname":"jack435","gold":445,"watchnumpy":4350,"watchnumuv":870,"hots":1305,"nofollower":435,"looktime":8700,"smlook":2175,"followe":1740,"gifter":870,"length":2610,"ares":"A_US","rating":"B","exp":1305,"type":"video_rating"}
···
数据清洗
需求
- 从原始数据中过滤需要的字段:主播ID(uid)、金币数量(gold)、总观看PV(watchnumpv)、粉丝关注数量(follower)、视频总开播时长(length)
- 对这些字段进行异常值判断,如果数据异常,则直接丢弃
分析
- 由于原始数据是json格式,所以可以使用fastjson进行解析
- 由于不需要聚合过程,所以只需要map阶段即可,reduce阶段就不需要
- map阶段的k1, v1的数据类型是固定的:<LongWritable, Text> k2, v2的数据类型为<Text, Text>,k2存储主播ID,v2存储核心字段,多个字段中间用\t分割
代码实现
pom文件:
<dependencies>
···
<!-- hadoop-client依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.54</version>
</dependency>
···
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- 将依赖的jar包也打进去 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- MainClass in mainfest make a executable jar -->
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
1、编写Map类
public class DataCleanMap extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
JSONObject jsonObject = JSON.parseObject(line);
String id = jsonObject.getString("uid");
int gold = jsonObject.getIntValue("gold");
int watchnumpv = jsonObject.getIntValue("watchnumpv");
int follower = jsonObject.getIntValue("follower");
int length = jsonObject.getIntValue("length");
if (gold >= 0 && watchnumpv >= 0 && follower >= 0 && length >= 0) {
// 组装k2, v2
Text k2 = new Text(id);
Text v2 = new Text(gold + "\t" + watchnumpv + "\t" + follower + "\t" + length);
context.write(k2, v2);
}
}
}
2、编写任务类
public class DataCleanJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.exit(100);
}
Configuration conf = new Configuration();
// 创建一个Job
Job job = Job.getInstance(conf, "DataClean");
// 设置Job运行的类
job.setJarByClass(DataCleanJob.class);
// 设置Job的Mapper类
job.setMapperClass(DataCleanMap.class);
// 设置Job的Reduce类
//job.setReducerClass(WordCountReducer.class);
// 设置文本输入类型
job.setInputFormatClass(TextInputFormat.class);
// 设置文本输出类型
job.setOutputFormatClass(TextOutputFormat.class);
// 设置文件输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置文件输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置输出文件key, value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 警用reduce
job.setNumReduceTasks(0);
// 启动任务
job.waitForCompletion(true);
}
}
3、打包
mvn clean assembly:assembly -DskipTests
打包后会生成两个jar包,一个包含第三方的jar,另一个是不含的。
待解决:
正常情况下通过mvn clean package
也可以打包生成MapReduceDemo-1.0-SNAPSHOT-jar-with-dependencies.jar
,不知道为什么不行。
4、上传jar包和数据文件到服务器
hdfs dfs -mkdir -p /data/videoinfo/20191225
hdfs dfs -put /home/bigdata/video_rating.log /data/videoinfo/20191225
5、运行程序
hadoop jar MapReduceDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.fang.dataClean.DataCleanJob /data/videoinfo/20191225 /result20191225
注意:运行可能会出现java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON 解决:需要将依赖的jar包也打进去,见pom.xml文件
编写定时任务
1、新建Shell脚本
#!/bin/bash
# 判断用户是否输入日期
if [ "X$1" = "X" ]
then
yes_time=`date +%Y%m%d --date="1 days ago"`
else
yes_time=$1
fi
cleanjob_input=/data/videoinfo/${yes_time}
cleanjob_output=/result/${yes_time}
jobs_home=/home/bigdata
# 删除输出目录
hdfs dfs -rm -r ${cleanjob_output}
# 执行清洗任务
hadoop jar ${jobs_home}/MapReduceDemo-1.0-SNAPSHOT-jar-with-dependencies.jar \
com.fang.dataClean.DataCleanJob ${cleanjob_input} ${cleanjob_output}
# 判断是否执行成功
hdfs dfs -ls ${cleanjob_output}/_SUCCESS
if [ "$?" = "0" ]
then
echo "clean job execute success"
else
echo "clean job execute failed"
fi
2、添加到crontab
30 00 * * * hdfs /bin/sh /home/bigdata/dataClean.sh » /data/jobs/logs/
结果数据导到MySQL
1、添加mysql驱动mysql-connector-java-5.1.47.jar包到/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/sqoop/lib
目录下
2、创建数据库和对应的表
3、使用Sqoop导入数据到MySQL
sqoop export --connect jdbc:mysql://192.168.10.*:3306/test1?serverTimezone=UTC --username root --password *** --table top10 --export-dir /result/20191225 --input-fields-terminated-by "\t"
出现错误:
Export job failed!
at org.apache.sqoop.mapreduce.ExportJobBase.runExport(ExportJobBase.java:439)
at org.apache.sqoop.manager.SqlManager.exportTable(SqlManager.java:930)
at org.apache.sqoop.tool.ExportTool.exportTable(ExportTool.java:92)
at org.apache.sqoop.tool.ExportTool.run(ExportTool.java:111)
at org.apache.sqoop.Sqoop.run(Sqoop.java:147)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:243)
at org.apache.sqoop.Sqoop.main(Sqoop.java:252)
这个问题不是具体的问题,但是想要知道具体的错误信息,在控制台是看不到的,只能到CDH的web管理界面去看。
Sqoop最终会转化为MR进行任务的执行,所以这里要看Sqoop的任务执行情况,还是要到YARN的详情界面去看。
2019-12-26 16:55:43,326 FATAL [IPC Server handler 0 on 44674] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1573534660588_0007_m_000000_0 - exited : top10 : Unsupported major.minor version 52.0
百度后发现CDH使用的JDK是1.7,而系统安装的jdk是1.8,到Cloudera Manager的管理页面修改各台主机的java目录,然后重启集群即可。再重新使用Sqoop导入。成功的输出如下:
查看数据库是否有数据。