【计算】MapReduce基础入门(三)

Posted by Kaka Blog on September 10, 2018

什么是MapReduce?

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。

MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。

MapReduce执行流程

在Hadoop中,用于执行MapReduce任务的机器角色有两个:

  • JobTracker用于调度工作的,一个Hadoop集群中只有一个JobTracker,位于master。
  • TaskTracker用于执行工作,位于各slave上。

img

MapReduce运行机制

MapReduce的思想就是分而治之。MR程序的执行过程主要分为三步:Map阶段、Shuffle阶段、Reduce阶段,如下图:

img

在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map和reduce,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。

对于一个MR任务,它的输入、输出以及中间结果都是<key, value>键值对:

  • Map:<k1, v1>list(<k2, v2>)
  • Reduce:<k2, list(v2)>list(<k3, v3>)

shuffle机制是mapreduce整个处理过程中的核心机制,涉及到了分组、排序、数据缓存以及中间结果传递(map结果怎么交付给reduce)。

第一个MapReduce程序:WordCount

编写程序

1.添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.fang</groupId>
    <artifactId>WordCount</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
            <version>2.4.1</version>
        </dependency>
    </dependencies>

</project>

2.实现Mapper

package com.fang.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Author: fwj
 * @Description:
 * @Date: Created in 2018/9/10 17:12
 * @Modified by:
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final IntWritable one = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 读取一行,LongWritable表示偏移量,这里可以理解为第几行
        String line = value.toString();
        // 拆分单词
        String[] words = line.split(" ");
        // 遍历,输出list(<k2, v2>)
        for (String word : words) {
            context.write(new Text(word), one);
        }
    }
}

3.实现Reduce

package com.fang.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @Author: fwj
 * @Description:
 * @Date: Created in 2018/9/10 17:22
 * @Modified by:
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 统计个数
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

4.主程序

package com.fang.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @Author: fwj
 * @Description:
 * @Date: Created in 2018/9/10 17:10
 * @Modified by:
 */
public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // 创建一个Job
        Job job = Job.getInstance(conf, "WordCount");
        // 设置Job运行的类
        job.setJarByClass(WordCount.class);
        // 设置Job的Mapper类
        job.setMapperClass(WordCountMapper.class);
        // 设置Job的Reduce类
        job.setReducerClass(WordCountReducer.class);
        // 设置文本输入类型
        job.setInputFormatClass(TextInputFormat.class);
        // 设置文本输出类型
        job.setOutputFormatClass(TextOutputFormat.class);
        // 设置文件输入路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 设置文件输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 设置输出文件key, value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 启动任务
        job.waitForCompletion(true);
    }
}

生成jar

1、首先点开File文件下的Project Structure

2、选择Artifacts—->点击蓝色的“+”

3、选中jar—>From modules with dependencies

4、注意Main Class的添加,此处就是选择你要生成的jar包的工程文件

5、.MF文件就是你生成jar包生成的签名信息,第一次生成jar包,会生成相应的.MF签名文件,若第二次再生成jar包,会报错,说已经存在,只需将.MF文件删除即可

6、选择输出的目录,即Output Directory

7、勾选Build on make 点击ok

8、Build——>Make Artifacts…

上传文件到HDFS

hadoop-2.9.1/bin/hdfs dfs -put wordtest /

运行

1.运行程序

hadoop-2.9.1/bin/hadoop jar WordCountMR.jar com.fang.mapreduce.WordCount /wordtest /result

2.查看目录

hadoop-2.9.1/bin/hdfs dfs -ls /

3.查看结果

hadoop-2.9.1/bin/hdfs dfs -cat /result/*

img

可能遇到的问题

问题:

  1. java -jar 报错 Could not find or load main Class
  2. Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.

解决:

  1. 生成jar时要选择Main-Class。
  2. 通过hadoop命令运行即可。

参考

MapReduce Tutorial

第一个MapReduce程序——WordCount