菜鸟必须要学的Hadoop之MapReduce

    技术2022-07-10  172

    一、什么是MapReduce

    MapReduce-----一个分布式计算框架 将大型数据操作作业分解为可以跨服务器集群并行执行的单个任务 起源于Goolgle 适用于大规模数据处理场景 每个节点处理存储在该节点的数据 每个job包含Map和Reduce两部分

    二、MapReduce的设计思想

    分而治之 简化并行计算的编程模型 构建抽象模型:Map和Reduce 开发人员专注于实现Mapper和Reducer函数 隐藏系统层细节 开发人员专注于业务逻辑实现

    三、MapReduce的优缺点

    优点: 易于编程 可扩展性 高容错性 高吞吐量 缺点: 难以实时计算 不适合流式计算 不适合DAG(有向图)计算

    四、MapReduce实现WordCount

    将文件存储在多个不同节点的数据块上(一般为128M),Map中计算不同单词,每有一个加一个1,然后在用一个聚合函数将其统计出来。然后将独立计算的拼在一台机器上计算。最后得出结果 用代码来演示: 首先先建立一个mr.wc的包其次建立WCRdecuer的java文件

    public class WCReducer extends Reducer <Text, IntWritable,Text,IntWritable>{ int sum; IntWritable v =new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //reduce端接收到的类型大概是这样的 wish<1,1,1,1,1,1> //对迭代器进行累加求和 sum = 0; //遍历迭代器 for (IntWritable count : values) { sum+=count.get(); } //将key和value进行写出 v.set(sum); context.write(key,v); } }

    导入的包注意都是MapReduce的包哦。 随后建立WCMapper的java文件

    public class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable> { Text k =new Text(); IntWritable v =new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.将文本转化成字符串 String line = value.toString(); //2.将字符串切割 String[] words= line.split("\\s+"); //3.将每一个单词写出去 for (String word : words){ k.set(word); context.write(k,v); } } }

    然后写运行文件:WCDriver

    public class WCDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.创建配置文件 Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"wordcount"); //2.设置jar的位置 job.setJarByClass(WCDriver.class); //3.设置Map和Reduce的位置\ job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); //4.设置Map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5.设置Reduce输出的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6.设置输入输出路径 // FileInputFormat.setInputPaths(job, new Path(args[0])); FileInputFormat.setInputPaths(job, new Path("C:\\Pratice\\hdfs\\data\\wcinput")); FileOutputFormat.setOutputPath(job, new Path("C:\\Pratice\\hdfs\\data\\wcoutput")); //7.提交程序运行 boolean result =job.waitForCompletion(true); System.exit(result ? 0:1); } }

    **注意:**这里面的路径input的是自己创造的文件的文件夹路径,需要在文件里面写上东西,注意需要加入空格,而output是不需要自己手动创建的,只需要自己写入相同路径且起一个文件夹名字则程序运行后会自己建立。

    如果没有运行成功检查自己的windows系统环境变量有没有配置hadoop的环境变量,且有没有在hadoop的bin目录下加入hadoop.dll和winutils.exe。

    五、Hadoop序列化

    必须可序列化 作用:网络传输以及持久化存储 IntWritable、LongWritable、FloatWriteable、 TextDoubleWriteable,booleanWriteable,NullWriteable等 都继承了Writeable接口 并实现write()和readFilelds()方法 keys必须实现WritableComparable接口 MapReduce框架会按照Key进行排序 Reduce阶段需要sort Keys需要可比较

    六、MapReduce框架原理

    MapReduce执行流程 split阶段 计算分片 map阶段 调用map方法对数据进行处理 shuffle阶段 主要负责将map端生成的数据传递给reduce端 reduce端 对shffle阶段传来的数据进行最后的整理合并

    七、MapReduce核心类介绍

    1.InputFormat接口 定义了图和将数据读入Mapper InputSplit[] getSplits InputSplilt表示由单个Mapper处理的数据 getSplits方法将一个大数据在逻辑上拆分为InputSplits RecodReader<k,v> getRecordReader 常用InputFormat接口实现类 TextInputFormat FileInputFormat KeyValiueInputFormat

    关于InputFormat运行机制: 1.找到你输入数据存储的目录 2.开始遍历目录下的每一个文件 3.遍历第一个文件 4.获取文件的大小 5.计算切片的大小,默认情况下,切片大小会等于块的大小 6.开始切片(假设260M的文件,0-128M,128-256M,256-260M,判断剩下的部分是否大于块的1.1倍,如果不大于1.1,就直接划分为一个切片)

    切片和HDFS的分块有什么区别? 切片:MapReduce中的一个逻辑概念,一个切片就是Mapper任务 切块:HDFS上的物理切割,是一个物理概念 通常情况下,切块的个数等于切片的个数

    2.InputSplit(输入分片) 在map之前,根据输入文件创建inputSplit 每个InputSplit对应一个mapper任务 输入分片存储的是分片长度和记录数据位置的数组

    3.Combiner类 Combine相当于本地化的Reduce操作 在shuffle之前进行本地聚合 用于性能优化,可选项 输入和输出类型一致 Reducer可以被用作Combiner的条件 符合交换律和结合律 实现Combiner job.setCombinerClass(WCReducer.class)

    4.Partitioner类 用于在map端对key进行分区 默认使用的是HashPartitioner 获取key的哈希值 使用key的哈希值对Reducer任务数求模 决定每条记录应该送到哪个Reducer处理 自定义Partitioner 继承抽象类Partitioner,重写getPartition方法 job.setPartitionerClass(MyPartitioner.calss)

    Reducer类与Mapper类如上面编程

    Processed: 0.012, SQL: 9