地表最强系列之带你学MapReduce

    技术2024-07-22  69

    什么是MapReduce

    MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)“和"Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。 map task通过InputFormat接口中的TextInputFormat来读取文件

    MapReduce的优缺点

    优点: 1.Mapreduce易于编程. 它简单的实现一些接口,就可以完成一个分布式程序,这个程序可以分布到大量的廉价的pc机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特性使的Mapreduce编程变得非常流行。 2.良好的扩展性. 项目当你的计算资源得不到满足的时候,你可以通过简单的通过增加机器来扩展它的计算能力 3.高容错性 Mapreduce的设计初衷就是使程序能够部署在廉价的pc机器上,这就要求它具有很高的容错性。比如一个机器挂了,它可以把上面的计算任务转移到另一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由hadoop内部完成的。 适合PB级以上海量数据的离线处理

    缺点: 1.无法进行实时计算 2.无法进行流式计算 3.DAG(有向图)计算。多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

    MapReduce的流程

    MapReduce的流程图

    MapReduce的源码解析

    InputFormat的源码 public abstract class InputFormat<K, V> { //这里的getSplits方法负责将一个大数据在逻辑上拆分成一个或多个的InputSplit.每一个InputSplit记录两个参数,第一个为这个分片数据的位置,第二个为这个分片数据的大小.InputSplit并没有真正的储存数据,只是提供了一个如何将数据分片的方法 public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; //根据InputSplit的方法,返回一个能够读取分片记录的RecordReader. public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; } } RecordReader的源码 public class TextInputFormat extends FileInputFormat<LongWritable, Text> implements public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); String delimiter = job.get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) { //根据系统取相对应的换行符 recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); } //分片是按照一行语句来分片的,标准就是要有换行符,这里就是实现的这个过程,找到换行符,将它和语句一起放到这个LineRecordReader这个数组里面,一起返回 return new LineRecordReader(job, (FileSplit)genericSplit, recordDelimiterBytes); } } InputSplit的源码 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { Stopwatch sw = (new Stopwatch()).start(); FileStatus[] files = this.listStatus(job); job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length); long totalSize = 0L;//设置文件的总尺寸 FileStatus[] arr$ = files; int len$ = files.length; for(int i$ = 0; i$ < len$; ++i$) { FileStatus file = arr$[i$]; if (file.isDirectory()) { //判断文件是否是目录,如果是的话就抛出异常 throw new IOException("Not a file: " + file.getPath()); } //不是的话就将文件进行累加 totalSize += file.getLen(); } //numSplits,将文件切分的数量 long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize); //FileSplit继承了InputSpilt类,其中的属性包括Path getPath()(当前文件的路径),long getStart(),long getLength().作用是记录分片切割的位置,方便后来的拼接,以及区分分片 ArrayList<FileSplit> splits = new ArrayList(numSplits); NetworkTopology clusterMap = new NetworkTopology(); FileStatus[] arr$ = files; int len$ = files.length; for(int i$ = 0; i$ < len$; ++i$) { FileStatus file = arr$[i$]; Path path = file.getPath();//获取当前文件的路径 long length = file.getLen(); if (length == 0L) { splits.add(this.makeSplit(path, 0L, length, new String[0])); } else { FileSystem fs = path.getFileSystem(job); BlockLocation[] blkLocations;//BlockLocation里面有hosts数组和offset偏移量 if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus)file).getBlockLocations(); } else { blkLocations = fs.getFileBlockLocations(file, 0L, length); } if (!this.isSplitable(fs, path)) { String[][] splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap); splits.add(this.makeSplit(path, 0L, length, splitHosts[0], splitHosts[1])); } else { long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);//这里是计算一个分片的大小是多少. //computeSplitSize方法:return Math.max(minSize, Math.min(goalSize, blockSize));取的是最小尺寸(默认是一个字节),文件大小,以及block块的大小的中值 long bytesRemaining; String[][] splitHosts; for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { //这理是最后的文件如果小于1.1倍,剩余的0.1的文件直接和上一个块合并 splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); } if (bytesRemaining != 0L) { splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } } } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return (InputSplit[])splits.toArray(new FileSplit[splits.size()]); } InputSplit的源码 目的是区分分片 public abstract class InputSplit { public InputSplit() { } public abstract long getLength() throws IOException, InterruptedException; public abstract String[] getLocations() throws IOException, InterruptedException;//切片的位置 @Evolving public SplitLocationInfo[] getLocationInfo() throws IOException { return null; } } 我们在写Mapper代码的时候设置的变量(LongWritable,Text)就是map的key和value。LongWritable表示偏移量可以理解为记录的行号,Text表示一行的语句 public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,IntWritable> 经过分片后获得的键值对key和value放到Mapper中的map方法进行逻辑处理形成新的键值对,在通过context.write方法输出到OutputCollector收集器中.OutputCollector把收集到的(k,v)键值对写入到环形缓冲区中(环形的优势:输入的时候会先清除,清除完了再输入.并且内存利用率高),环形缓冲区默认的大小为100M,只写80%(因为填充因子,剩下的20%要记录环形缓冲区的信息).然后将环形缓冲区的内容拿出来根据HashPartitioner来进行分区(因为要散列,就是将内容平均的分开所以用Hash来分区)然后要根据key进行快排分好区,接着用Combiner来进行归并将相同的归类,形成一个大文件;reduce task根据自己的分区号,去各个map task节点上copy相同的partition的数据到reduce task 本地磁盘工作目录,并且reduce task 会吧同一分区的来自不同的map task的结果文件,在进行merge合并成一个大文件(归并排序),大文件内容按照k有序
    Processed: 0.039, SQL: 9