1,2,3,4 map task通过TextInputFormat(RecordReader>>read())读文件,一次读一行,返回(key,value)
5 上一步的到的(key,value)键值对经过mapper的map()方法逻辑处理形成新的(key,value)键值对,通过context.write()输出到OutputCollector收集器
6 OutputCollector把收集到的键值对吸入到环形缓冲区中,环形缓冲区大小默认为100m,只写80%.当环形缓冲区的数据达到80%时,就会触发spill溢出;
shuffle是MR处理流程中的一个过程,它的每一个处理步骤是分散在各个map task和reducetask节点上完成的,整体来看分为三个操作:
分区partitionsort根据key排序Combiner进行局部value的合并7 spill溢出前需要对数据进行分区和排序,即会对环形缓冲区里面的每个键值对hash一个partiition值,相同partition值为一个分区,然后把环形缓冲区中的数组根据partition值和key值两个关键字升序排序;同一partition内的按照key排序;
8 将环形缓冲区中排序后的内存数据不断spill溢出到本地磁盘文件,如果map阶段处理的数据量较大,可能会溢出多个文件
9 多个溢出文件会被merge合并成大的溢出文件,合并采用归并排序,所以合并的maptask最终结果文件还是分区且区内有序的;
10 reduce task根据自己的分区号,去各个map task节点上copy相同partition的数据到reduce task本地磁盘工作目录;
11 reduce task会把同一分区的来自不同maptask的结果文件,再进行merge合并成一个大文件,大文件内容按照key有序;
12,13 合并成大文件后,shuffle的过程也就结束了,后面进入reduce task的逻辑运算过程,首先调用GroupingComparator对大文件里面的数据进行分组,从文件中每次去除一组键值对,调用用户自定义的reduce()方法进行逻辑处理;
14,15 最后通过OutPutFormat方法将结果写道part-r-000**结果文件中
MyMapper类
public class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable> { private LongWritable one = new LongWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] wds = value.toString().split(" "); System.out.println(wds); for (String wd : wds) { word.set(wd); context.write(word,one); } } }MyReducer类
public class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>{ private LongWritable res = new LongWritable(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long ressum = 0; for (LongWritable value : values) { ressum += value.get(key); } res.set(ressum); context.write(key,res); } }