什么是MapReduce?
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。
MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。
MapReduce执行流程
在Hadoop中,用于执行MapReduce任务的机器角色有两个:
- JobTracker用于调度工作的,一个Hadoop集群中只有一个JobTracker,位于master。
- TaskTracker用于执行工作,位于各slave上。
MapReduce运行机制
MapReduce的思想就是分而治之。MR程序的执行过程主要分为三步:Map阶段、Shuffle阶段、Reduce阶段,如下图:
在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map和reduce,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。
对于一个MR任务,它的输入、输出以及中间结果都是<key, value>
键值对:
- Map:
<k1, v1>
→list(<k2, v2>)
- Reduce:
<k2, list(v2)>
→list(<k3, v3>)
shuffle机制是mapreduce整个处理过程中的核心机制,涉及到了分组、排序、数据缓存以及中间结果传递(map结果怎么交付给reduce)。
第一个MapReduce程序:WordCount
编写程序
1.添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fang</groupId>
<artifactId>WordCount</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
</project>
2.实现Mapper
package com.fang.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @Author: fwj
* @Description:
* @Date: Created in 2018/9/10 17:12
* @Modified by:
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 读取一行,LongWritable表示偏移量,这里可以理解为第几行
String line = value.toString();
// 拆分单词
String[] words = line.split(" ");
// 遍历,输出list(<k2, v2>)
for (String word : words) {
context.write(new Text(word), one);
}
}
}
3.实现Reduce
package com.fang.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @Author: fwj
* @Description:
* @Date: Created in 2018/9/10 17:22
* @Modified by:
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 统计个数
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
4.主程序
package com.fang.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* @Author: fwj
* @Description:
* @Date: Created in 2018/9/10 17:10
* @Modified by:
*/
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 创建一个Job
Job job = Job.getInstance(conf, "WordCount");
// 设置Job运行的类
job.setJarByClass(WordCount.class);
// 设置Job的Mapper类
job.setMapperClass(WordCountMapper.class);
// 设置Job的Reduce类
job.setReducerClass(WordCountReducer.class);
// 设置文本输入类型
job.setInputFormatClass(TextInputFormat.class);
// 设置文本输出类型
job.setOutputFormatClass(TextOutputFormat.class);
// 设置文件输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 设置文件输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置输出文件key, value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 启动任务
job.waitForCompletion(true);
}
}
生成jar
1、首先点开File文件下的Project Structure
2、选择Artifacts—->点击蓝色的“+”
3、选中jar—>From modules with dependencies
4、注意Main Class的添加,此处就是选择你要生成的jar包的工程文件
5、.MF文件就是你生成jar包生成的签名信息,第一次生成jar包,会生成相应的.MF签名文件,若第二次再生成jar包,会报错,说已经存在,只需将.MF文件删除即可
6、选择输出的目录,即Output Directory
7、勾选Build on make 点击ok
8、Build——>Make Artifacts…
上传文件到HDFS
hadoop-2.9.1/bin/hdfs dfs -put wordtest /
运行
1.运行程序
hadoop-2.9.1/bin/hadoop jar WordCountMR.jar com.fang.mapreduce.WordCount /wordtest /result
2.查看目录
hadoop-2.9.1/bin/hdfs dfs -ls /
3.查看结果
hadoop-2.9.1/bin/hdfs dfs -cat /result/*
可能遇到的问题
问题:
- java -jar 报错 Could not find or load main Class
- Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
解决:
- 生成jar时要选择Main-Class。
- 通过hadoop命令运行即可。