Mapreduce源码分析(一):FileInputFormat切片机制,源码详解

    技术2025-09-20  43

    FileInputFormat切片机制,源码详解

    1.InputFormat:抽象类

    只有两个抽象方法

    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException; public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;

    2.FileInputFormat:抽象类

    继承了 InputFormat

    实现了 getSplits方法,没有实现 createRecordReader方法,而是有它的子类TextInputFormat处理的是文本文件,实现了createRecordReader方法,其实他还有个子类SequenceFileInputFormat 是 Hadoop 定义的一种二进制形式存放的键/值文件,感兴趣的可以去了解下,现在主要讲解处理文本文件。

    主要参数:

    可以在hadoop中的mapred.site.xml中配置

    //INPUT_DIR:输入路径,用逗号做分割。 public static final String INPUT_DIR = "mapreduce.input.fileinputformat.inputdir"; //SPLIT_MAXSIZE:最大的划分大小 public static final String SPLIT_MAXSIZE="mapreduce.input.fileinputformat.split.maxsize"; //SPLIT_MINSIZE:最小的划分大小 public static final String SPLIT_MINSIZE="mapreduce.input.fileinputformat.split.minsize"; //PATHFILTER_CLASS:输入文件过滤器,通过过滤器的文件才会加入 InputFormat; public static final String PATHFILTER_CLASS = "mapreduce.input.pathFilter.class"; //剩余文件长度/切片数如果小于1.1归并到上一个切片中,如果大于1.1则新开个切片存放,逻辑切片是虚拟的 private static final double SPLIT_SLOP = 1.1;

    主要方法:

    protected List<FileStatus> listStatus(JobContext job)

    递归获取输入数据目录中的所有文件(包括文件信息),输入的 job 是系统运行的配置 Configuration,包含了上面我们提到的参数。

    protected List<FileStatus> listStatus(JobContext job) throws IOException { Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } else { TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,job.getConfiguration()); boolean recursive = getInputDirRecursive(job); List<PathFilter> filters = new ArrayList(); filters.add(hiddenFileFilter); PathFilter jobFilter = getInputPathFilter(job); if (jobFilter != null) { filters.add(jobFilter); } PathFilter inputFilter = new FileInputFormat.MultiPathFilter(filters); List<FileStatus> result = null; int numThreads = job.getConfiguration().getInt("mapreduce.input.fileinputformat.list-status.num-threads", 1); Stopwatch sw = (new Stopwatch()).start(); if (numThreads == 1) { result = this.singleThreadedListStatus(job, dirs, inputFilter, recursive); } else { Iterable locatedFiles = null; try { LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(job.getConfiguration(), dirs, recursive, inputFilter, true); locatedFiles = locatedFileStatusFetcher.getFileStatuses(); } catch (InterruptedException var12) { throw new IOException("Interrupted while getting file statuses"); } result = Lists.newArrayList(locatedFiles); } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); } LOG.info("Total input paths to process : " + ((List)result).size()); return (List)result; } } public List<InputSplit> getSplits(JobContext job)

    此方法主要为了对文件进行切片规划

    参数:JobContext job

    public List<InputSplit> getSplits(JobContext job) throws IOException { //启动线程 Stopwatch sw = new Stopwatch().start(); //最小分片大小:getFormatMinSplitSize=1L,getMinSplitSize(job)可以设置的分片大小,默认为1L long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //最大分片大小:可设置,默认为9223372036854775807L long maxSize = getMaxSplitSize(job); // generate splits //创建一个集合保存切割的逻辑分片 List<InputSplit> splits = new ArrayList<InputSplit>(); //通过listStatus(job)方法,把传进来的目录下所有的文件放到集合中 List<FileStatus> files = listStatus(job); //遍历文件 for (FileStatus file: files) { //文件路径 Path path = file.getPath(); //文件长度 long length = file.getLen(); //文件长度不为0 if (length != 0) { //创建一个数组保存数据块地址对象,具体属性在下面有介绍 BlockLocation[] blkLocations; //文件类型是不是带有切片状态的文件 if (file instanceof LocatedFileStatus) { //所有数据块的位置放入到数组中 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { //把文件变成带有切片状态的文件 FileSystem fs = path.getFileSystem(job.getConfiguration()); //所有数据块的位置放入到数组中 blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { //获取数据块大小 long blockSize = file.getBlockSize(); //计算逻辑切片大小,在下面有介绍 long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; //开始切片,剩余长度/切片大小 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); //剩余文件长度 bytesRemaining -= splitSize; } //这时文件剩余长度/切片数要么等于0要么小于1.1,剩余的文件长度很小了 if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable,文件不可切片 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //如果文件长度为空创建个空片 //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); //线程停止 sw.stop(); if (LOG.isDebugEnabled()) { //log4j日志输出 LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } //返回切片数组 return splits; } protected FileSplit makeSplit(Path file, long start, long length,

    ​ String[] hosts, String[] inMemoryHosts)

    创建切片对象

    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { return new FileSplit(file, start, length, hosts, inMemoryHosts); }

    返回值为FileSplit类型它继承InputSplit抽象类和实现类Writable序列化接口,包含以下属性:

    public class FileSplit extends InputSplit implements Writable { private Path file;//文件路径 private long start;//起始位置(偏移量) private long length;//划分长度 private String[] hosts;//存储位置 private SplitLocationInfo[] hostInfos; protected long computeSplitSize(long blockSize, long minSize,long maxSize)

    计算数据块大小,取最小划分、最大划分、数据块大小的中间值,可以通过设置最小划分和最大划分来改变切片大小,如果不设置,那么就默认为数据块大小一样

    protected long computeSplitSize(long blockSize, long minSize,long maxSize) { //分片 return Math.max(minSize, Math.min(maxSize, blockSize)); }

    TextInputFormat

    主要方法:createRecordReader

    参数:InputSplit

    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = //textinputformat.record.delimiter系统的回车 context.getConfiguration().get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); } return new LineRecordReader(recordDelimiterBytes); }

    实体类:TextInputFormat 继承了 FileInputFormat

    public class TextInputFormat extends FileInputFormat<LongWritable, Text>

    重写了

    @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { //textinputformat.record.delimiter自定义分隔符,默认时回车符。 String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) //把分隔符变成字节流,并且编码格式是UTF-8 recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); //返回一个 return new LineRecordReader(recordDelimiterBytes); }

    3.BlockLocation:数据块位置

    主要存放数据块的信息

    属性介绍:

    //hosts:数据块存放的地址 private String[] hosts; //缓存地址 private String[] cachedHosts; //数据块的名称 private String[] names; //拓扑的路径 private String[] topologyPaths; //偏移量 private long offset; //长度 private long length; private boolean corrupt;

    Processed: 0.009, SQL: 9