1.InputFormatInputFormat常见子类包括: TextInputFormat (普通文本文件,MR框架默认的读取实现类型) KeyValueTextInputFormat(读取一行文本数据按照指定分隔符,把数据封装为kv类型) NLineInputF ormat(读取数据按照行数进行划分分片) CombineTextInputFormat(合并小文件,避免启动过多MapTask任务) 自定义InputFormat
2.CombineTextInputFormat// 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置4m CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
CombineTextInputFormat切片原理 切片生成过程分为两部分:虚拟存储过程和切片过程 假设设置 setMaxInputSplitSize 值为 4M 四个小文件: 1.txt -->2M ;2.txt-->7M;3.txt-->0.3M;4.txt--->8.2M 虚拟存储过程:把输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值进行比 较,如果不大于设置的最大值,逻辑上划分一个块 。如果输入文件大于设置的最大值且大于 两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2 倍,此时 将文件均分成2 个虚拟存储块(防止出现太小切片)。 比如如 setMaxInputSplitSize 值为 4M ,输入文件大小为 8.02M ,则先逻辑上分出一个 4M 的 块。剩余的大小为4.02M ,如果按照 4M 逻辑划分,就会出现 0.02M 的非常小的虚拟存储文 件,所以将剩余的4.02M 文件切分成( 2.01M 和 2.01M )两个文件。 1.txt-->2M;2M<4M; 一个块; 2.txt-->7M;7M>4M, 但是不大于两倍,均匀分成两块;两块:每块 3.5M ; 3.txt-->0.3M;0.3<4M ,0.3M<4M , 一个块 4.txt-->8.2M; 大于最大值且大于两倍;一个 4M 的块,剩余 4.2M 分成两块,每块 2.1M 所有块信息: 2M, 3.5M , 3.5M , 0.3M , 4M , 2.1M , 2.1M 共 7 个虚拟存储块。 切片过程 判断虚拟存储的文件大小是否大于setMaxInputSplitSize 值,大于等于则单独形成一个 切片。 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。 按照之前输入文件:有 4 个小文件大小分别为 2M 、 7M 、 0.3M 以及 8.2M 这四个小文件, 则虚拟存储之后形成7 个文件块,大小分别为: 2M, 3.5M , 3.5M , 0.3M , 4M , 2.1M , 2.1M 最终会形成 3 个切片,大小分别为: (2+3.5 ) M ,( 3.5+0.3+4 ) M ,( 2.1+2.1 ) M 3.自定义 InputFormat 1. extends FileInputFormat 2. 自定义 RecordReader 3.在driver端指定自定义的 InputFormat 代码: public class CustomFileInputformat extends FileInputFormat<Text, BytesWritable> { /** * 文件不可切分 */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } /** * createRecordReader 读取文本的对象 * * @param inputSplit * @param taskAttemptContext * @return * @throws IOException * @throws InterruptedException */ @Override public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { CustomRecordReader recordReader = new CustomRecordReader(); recordReader.initialize(inputSplit, taskAttemptContext); return recordReader; } } public class CustomRecordReader extends RecordReader<Text, BytesWritable> { private Configuration configuration; /** * 切片 */ private FileSplit split; /** * 输出的kv */ private Text k = new Text(); private BytesWritable value = new BytesWritable(); /** * 是否读取到内容的标识符 */ private boolean flag = true; /** * 初始化方法 把切片已经上下文提升为全局 * * @param inputSplit * @param taskAttemptContext * @throws IOException * @throws InterruptedException */ @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //获取到文件切片以及配置文件对象 this.split = (FileSplit) inputSplit; configuration = taskAttemptContext.getConfiguration(); } /** * 用来读取数据的方法 * * @return * @throws IOException * @throws InterruptedException */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (flag) { // 1 定义缓存区 存放读取的数据 byte[] contents = new byte[(int) split.getLength()]; FileSystem fs = null; FSDataInputStream fis = null; // 2 获取文件系统 Path path = split.getPath(); fs = path.getFileSystem(configuration); // 3 读取数据 fis = fs.open(path); // 4 读取文件内容 IOUtils.readFully(fis, contents, 0, contents.length); // 5 输出文件内容 value.set(contents, 0, contents.length); // 6 获取文件路径及名称 String name = split.getPath().toString(); // 7 设置输出的key值 k.set(name); IOUtils.closeStream(fis); flag = false; return true; } return false; } /** * 获取key * * @return * @throws IOException * @throws InterruptedException */ @Override public Text getCurrentKey() throws IOException, InterruptedException { return k; } /** * 获取value * * @return * @throws IOException * @throws InterruptedException */ @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } /** * 获取进度 * * @return * @throws IOException * @throws InterruptedException */ @Override public float getProgress() throws IOException, InterruptedException { return 0; } /** * 关闭 * * @throws IOException */ @Override public void close() throws IOException { }4.自定义OutputFormat
1. extends FileOutputFormat 2. 自定义 RecordWriter 3.在driver端指定自定义的 OutputFormat 代码: public class CustomOutputFormat extends FileOutputFormat<Text, NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //获取文件系统对象 FileSystem fs = FileSystem.get(taskAttemptContext.getConfiguration()); //指定输出数据的文件 Path lagouPath = new Path("e:/lagou.log"); Path otherLog = new Path("e:/other.log"); //获取输出流 final FSDataOutputStream lagouOut = fs.create(lagouPath); final FSDataOutputStream otherOut = fs.create(otherLog); return new CustomWriter(lagouOut, otherOut); } } public class CustomWriter extends RecordWriter<Text, NullWritable> { private FSDataOutputStream lagouOut; private FSDataOutputStream otherOut; public CustomWriter(FSDataOutputStream lagouOut, FSDataOutputStream otherOut) { this.lagouOut = lagouOut; this.otherOut = otherOut; } @Override public void write(Text key, NullWritable nullWritable) throws IOException, InterruptedException { // 判断是否包含“lagou”输出到不同文件 if (key.toString().contains("lagou")) { lagouOut.write(key.toString().getBytes()); lagouOut.write("\r\n".getBytes()); } else { otherOut.write(key.toString().getBytes()); otherOut.write("\r\n".getBytes()); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { IOUtils.closeStream(lagouOut); IOUtils.closeStream(otherOut); } }