文章目录
一、MapReduce的定义二、MapReduce优缺点优点:缺点
三、MapReduce组成四、案例演示第一步、创建测试数据第二步、编写Mapper类第三步、编写Reducer类第四步、编写Driver类第五步、运行
五、MapReduce运行流程流程示意图-简略版(结合案例)流程示意图-详细版
一、MapReduce的定义
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组建整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上
二、MapReduce优缺点
优点:
易于编程,实现一些接口就可以完成一个分布式程序 良好的扩展性,可以通过简单的增加机器提高算力 高容错性,单个节点挂了不影响整体运行 适合PB级以上海量数据的离线处理
缺点
不擅长实时运算 不擅长流式运算,数据源必须是静态的
三、MapReduce组成
Mapper阶段 用户自定义的Mapper要继承自己的父类 Mapper的输入数据是KV对的形式 Mapper中的业务逻辑下载map()方法中 Mapper的输出数据是KV对的形式 map()方法对每一个KV对调用一次
Reduce阶段 用户自定义的Reducer要继承自己的父类 Reducer的输入数据类型对应Mapper的输出数据类型,也是KV Reducer的业务逻辑写在reduce()方法中 ReduceTask进程对每一组相同k的KV组调用一次reduce()方法
Driver阶段 用于提交整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象
四、案例演示
wordcount案例 现有一份数据,要求计算出数据中每个单词出现的次数 效果大致为:
第一步、创建测试数据
第二步、编写Mapper类
package a
.b
.mr
.wc
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
Text k
=new Text();
IntWritable v
=new IntWritable(1);
@Override
protected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{
String line
=value
.toString();
String
[] words
=line
.split("\\s+");
for (String word
:words
){
k
.set(word
);
context
.write(k
,v
);
}
}
}
第三步、编写Reducer类
package a
.b
.mr
.wc
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class WCReducer extends Reducer<Text,IntWritable, Text, IntWritable> {
int sum
;
IntWritable v
=new IntWritable();
@Override
protected void reduce(Text key
, Iterable
<IntWritable> values
, Context context
) throws IOException
, InterruptedException
{
sum
=0;
for (IntWritable count
: values
) {
sum
+=count
.get();
}
v
.set(sum
);
context
.write(key
,v
);
}
}
第四步、编写Driver类
package a
.b
.mr
.wc
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
import java
.io
.IOException
;
public class WCDiver {
public static void main(String
[] args
) throws IOException
, ClassNotFoundException
, InterruptedException
{
Configuration conf
=new Configuration();
Job job
=Job
.getInstance(conf
,"wordcount");
job
.setJarByClass(WCDiver
.class);
job
.setMapperClass(WCMapper
.class);
job
.setReducerClass(WCReducer
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(IntWritable
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(IntWritable
.class);
FileInputFormat
.setInputPaths(job
,new Path("E:\\test\\input.txt"));
FileOutputFormat
.setOutputPath(job
,new Path("E:\\test\\output.txt"));
boolean result
=job
.waitForCompletion(true);
System
.exit(result
?0:1);
}
}
第五步、运行
得到输出文件:
五、MapReduce运行流程
首先先看一下流程示意图
流程示意图-简略版(结合案例)
简单来说: 第一步、InputFormat 1、这里默认使用了TextInputFormat中的getSplits方法对文件input.txt进行分片,这里就分了一个切片。 2、提交切片信息给到Yarn,计算出所需要的的Map Task数量,这里是一个。 3、调用RecorderReader的read()方法,读取文件,形成K.V的形式传入Map 第二步、Map接收InputFormat传来的文件,根据业务逻辑多数据进行计算,这里是根据空白字符切割,传向环形缓冲区,环形缓冲区达到80%时会溢写,传向Partitioner。 第三步、Combiner为每一个Map传出的文件进行局部小整合(注意Combiner并不是必须要有的,他的作用与最后的Reduce基本一致,在编写上代码也几乎相同,其作用是将Map传出的小文件,先整合成中文件,以减轻Reducer的压力,在数据量小的时候几乎没有效果。)(上诉案例没有用到Combiner) 第四步、Partitioner对传来的数据进行分区,对分区后的数据排序,分区决定了每个区块去往哪一个Reducer 第五步、Reducer对传来的数据进行最后的整合 第六步、输出文件
流程示意图-详细版