MapReduce过程实现(shuffle阶段)

    技术2025-12-16  13

    文章目录

    一、FileInputFormat进行切片(一)FileInputFormat继承InputFormat,getSplites()方法(二)getRecordReader 二、MapTask工作机制(一).Read 阶段:(二).Map 阶段:(三).Collect 收集阶段:(四).默认 partition 分区(五).Spill 阶段:(六).Combine 阶段: 三、ReduceTask 工作机制(一)Copy 阶段:(二)Merge 阶段:(三)Sort 阶段:(四)Reduce 阶段:

    一、FileInputFormat进行切片

    InputFormat 的主要功能就是确定每一个 map 任务需要读取哪些数据以及如何读取数据的问题,每一个 map 读取哪些数据由 InputSplit(数据切片)决定,如何读取数据由 RecordReader 来决定。InputFormat 中就有获取 InputSplit 和RecordReader 的方法。 hadoop的默认输入格式为TextInputFormat,是FileInputFormat的实现类

    (一)FileInputFormat继承InputFormat,getSplites()方法

    验证输入的文件路径是否为有效文件将输入文件进行切片,默认的大小等于block块的大小(默认为128M),在切片过程中,要判断切完剩下的部分是否大于块的 1.1 倍,不大于 1.1 倍就划分一块切片 注意:数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。

    (二)getRecordReader

    按照换行符分隔进行读取,即每次读取一行

    inputformat输入键值对,key为行偏移量,value为本行的内容(不包括任何换行终止符)

    二、MapTask工作机制

    (一).Read 阶段:

    Map Task 通过用户编写的 RecordReader,按照 InputSplit 记录的位置信息读取数据,从中解析出一个个<Key,Value>。

    (二).Map 阶段:

    将解析出的 key/value 交给用户编写 map()函数处理,并产生一系列新的 key/value。

    (三).Collect 收集阶段:

    在用户编写 map()函数中,当数据处理完成后,一般会调用 OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value分区(调用 Partitioner),并写入一个环形内存缓冲区中(分区是在写入缓存的时候完成的)。

    (四).默认 partition 分区

    public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }

    默认分区是根据 key 的 hashCode 对 reduceTasks 个数取模得到的。用户没法控制哪个 key 存储到哪个分区 如果想根据设定的条件分区,可以自定义类继承 Partitioner,重写 getPartition()方法

    注意: 如果 reduceTask 的数量> getPartition 的结果数,则会多产生几个空的输出文 件 part-r-000xx; 如果 1<reduceTask 的数量<getPartition 的结果数,则有一部分分区数据无处安放,会 Exception; 如果reduceTask 的数量=1,则不管 mapTask 端输出多少个分区文件,最终结 果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;

    (五).Spill 阶段:

    当缓存区达到一定的比例之后,一个后台线程开始把缓存区的数据写入到磁盘,这个写入的过程叫做Spill,即“溢写”,开始Spill的比例默认是0.8,当环形缓冲区达到0.8时,将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作 溢写阶段详情: 步骤 1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。 步骤 2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N 表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。 步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB ,则将内存索引写到文件output/spillN.out.index 中

    (六).Combine 阶段:

    当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。 当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件 output/file.out 中,同时生成相应的索引文件 output/file.out.index。在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 io.sort.factor(默认 100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让每个MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

    三、ReduceTask 工作机制

    (一)Copy 阶段:

    ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

    (二)Merge 阶段:

    在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

    (三)Sort 阶段:

    按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。

    (四)Reduce 阶段:

    reduce()函数将计算结果写到 HDFS 上。

    Processed: 0.014, SQL: 9