mapreduce原理

    技术2022-07-11  94

    MapReduce原理


    MapReduce的设计思想

    分而治之 简化并行计算的编程模型 构建抽象模型:Map和Reduce 开发人员专注于实现Mapper和Reducer函数 隐藏系统层细节 开发人员专注于业务逻辑实现

    MapReduce特点

    优点 易于编程可扩展性高容错性高吞吐量 不适用领域 难以实时计算不适合流式计算

    MapReduce执行过程

    数据定义格式 map:(k1,v1)=>(k2,v2)reduce(k2,list(v2))=>list(k3,v3) MapReduce执行过程 MapperConbinerPartitionerShuffle and sortReducer

    Hadoop V1 MR引擎

    Job Tracker 运行在NameNode接受客户端Job请求提交给Task Tracker Task Tracker 从Job Tracker 接受任务请求执行map,reduce等操作返回心跳给Job Tracker

    Hadoop V2 YARN

    用户向YARN中提交应用程序,其中包括AM程序、启动AM的命令、命令参数、用户程序等;事实上,需要准确描述运行ApplicationMaster的unix进程的所有信息。提交工作通常由YarnClient来完成。RM为该应用程序分配第一个Container,并与对应的NM通信,要求它在这个Container中启动AM;AM首先向RM注册,这样用户可以直接通过RM査看应用程序的运行状态,运行状态通过AMRMClientAsync.CallbackHandler的getProgress() 方法来传递给RM。 然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4〜7;AM采用轮询的方式通过RPC协议向RM申请和领取资源;资源的协调通过 AMRMClientAsync异步完成,相应的处理方法封装在AMRMClientAsync.CallbackHandler中。—旦AM申请到资源后,便与对应的NM通信,要求它启动任务;通常需要指定一个ContainerLaunchContext,提供Container启动时需要的信息。NM为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务;各个任务通过某个RPC协议向AM汇报自己的状态和进度,以让AM随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;ApplicationMaster与NM的通信通过NMClientAsync object来完成,容器的所有事件通过NMClientAsync.CallbackHandler来处理。例如启动、状态更新、停止等。应用程序运行完成后,AM向RM注销并关闭自己。

    Map Reduce原理

    1,2,3,4 map task通过TextInputFormat(RecordReader>>read())读文件,一次读一行,返回(key,value)

    5 上一步的到的(key,value)键值对经过mapper的map()方法逻辑处理形成新的(key,value)键值对,通过context.write()输出到OutputCollector收集器

    6 OutputCollector把收集到的键值对吸入到环形缓冲区中,环形缓冲区大小默认为100m,只写80%.当环形缓冲区的数据达到80%时,就会触发spill溢出;

    shuffle是MR处理流程中的一个过程,它的每一个处理步骤是分散在各个map task和reducetask节点上完成的,整体来看分为三个操作:

    分区partitionsort根据key排序Combiner进行局部value的合并

    7 spill溢出前需要对数据进行分区和排序,即会对环形缓冲区里面的每个键值对hash一个partiition值,相同partition值为一个分区,然后把环形缓冲区中的数组根据partition值和key值两个关键字升序排序;同一partition内的按照key排序;

    8 将环形缓冲区中排序后的内存数据不断spill溢出到本地磁盘文件,如果map阶段处理的数据量较大,可能会溢出多个文件

    9 多个溢出文件会被merge合并成大的溢出文件,合并采用归并排序,所以合并的maptask最终结果文件还是分区且区内有序的;

    10 reduce task根据自己的分区号,去各个map task节点上copy相同partition的数据到reduce task本地磁盘工作目录;

    11 reduce task会把同一分区的来自不同maptask的结果文件,再进行merge合并成一个大文件,大文件内容按照key有序;

    12,13 合并成大文件后,shuffle的过程也就结束了,后面进入reduce task的逻辑运算过程,首先调用GroupingComparator对大文件里面的数据进行分组,从文件中每次去除一组键值对,调用用户自定义的reduce()方法进行逻辑处理;

    14,15 最后通过OutPutFormat方法将结果写道part-r-000**结果文件中

    wordcount

    MyMapper类

    public class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable> { private LongWritable one = new LongWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] wds = value.toString().split(" "); System.out.println(wds); for (String wd : wds) { word.set(wd); context.write(word,one); } } }

    MyReducer类

    public class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>{ private LongWritable res = new LongWritable(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long ressum = 0; for (LongWritable value : values) { ressum += value.get(key); } res.set(ressum); context.write(key,res); } }
    Processed: 0.010, SQL: 9