浅析Mapreduce工作原理

    技术2026-01-29  10

    MapReduce各阶段工作原理图: 下面讲解MapReduce的工作过程及原理: 1.split阶段 对于传入进来的大文件,采用TextInputFormat文件输入方式,然后对其进行物理分片,按照128M一个分片来分成若干map tasks。如果传入的是一个一个小文件,可以采用CombinerInputFormat方式传入文件,将其聚合成一个大文件。 2.一行一行读 使用的是TextInputFormat类中的getRecordReader方法来获取行信息,源码如下:

    public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); String delimiter = job.get("textinputformat.record.delimiter"); ---获取结束符,这里的结束符是可以设置的,默认是 byte[] recordDelimiterBytes = null; if (null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); } return new LineRecordReader(job, (FileSplit)genericSplit, recordDelimiterBytes); ----返回一行读取信息的实例` }

    这里返回的LineRecordReader是RecordReader抽象类的一个实例,返回的内容是一行的分片信息。(偏移量和行内容) 3.map业务逻辑 将获取到的一行内容经过Mapper类的map方法得到新的key和value,这里面具体的业务逻辑需要根据具体的实现进行编写。 4.shuffle阶段(环形缓冲区) map端写出的key和value在被写到磁盘里之前会输送给outputCollector(环形缓冲区),环形缓冲区的默认大小为100M(可以通过参数调整缓冲区的大小),当写到80%时才会溢写到磁盘(80%也是默认的,可以通过mapreduce.map.sort.spill.percent配置)。在map输出的数据输送给reduce端之前,往往需要经过以下几个过程: (1) Combiner:对map task节点上相同的key作value的合并,这是一种优化手段,可以设置Combiner,在需要的时候调用Combiner即可。 (2) partitioner:对同一分片中的key进行hash取值,相同的partition值放在一个分区中,取值的源码如下:

    public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & 2147483647) % numReduceTasks; }

    这里对key进行hash,然后逻辑与一个“2147483647”(因为hash的取值可能产生负值,所以强制位与int类型的最大值,去除符号位) 这里的分区通常情况下是需要根据业务逻辑进行重写,需要设置分区个数。 partition的值会默认进行排序,这里需要对key的值进行排序,使partition和key值都是有序的。(默认是升序排序) (4) merge:调用归并排序方法对相同的partition值归并在一起形成一个大的文件,也就是map task最终的结果文件。 (5) reduce task会根据自己的分区值去不同的map task中copy对应的partition放到reduce task中,将不同的文件再次使用归并排序合并成一个大文件,然后在写到磁盘里,这里如果设置了Combiner,会调用Combiner进行优化。 5.reduce阶段 reduce task任务是将归并后的大文件进行逻辑处理,首先调用的就是GroupComparator对文件进行分组,每次reduce取一组key和values的值,使用自己重写的reduce方法对其进行逻辑处理。 最后将处理好的文件通过TextOutputFormat的方法写到part-r-000**中。

    Processed: 0.014, SQL: 9