MapReduce初学

    技术2022-07-12  81

    目录

    一.关于MapReduce(一)什么是MapReduce?(二) MapReduce的设计思想(三) MapReduce特点(四)MapReduce实现WordCount(五)MapReduce执行过程(六)Key&Value类型 二.MapReduce编程模型(一)InputFormat接口(二)Mapper类(三)Combiner类(五)Reducer类(六)OutputFormat接口(七)Job类执行任务,代码如下:(八)InputSplit(数据切片)

    一.关于MapReduce

    (一)什么是MapReduce?

    1.MapReduce是一个分布式计算框架

    它将大型数据操作作业分解为可以跨服务器集群并行执行的单个任务。起源于Google,它是一个编程模型,用于大数据量的计算

    2.适用于大规模数据处理场景

    每个节点处理存储在该节点的数据

    3.每个job包含Map和Reduce两部分

    (二) MapReduce的设计思想

    1.分而治之

    简化并行计算的编程模型,即把对大规模数据集的操作分发给一个主节点管理下的各个子节点共同完成,然后整合各个子节点的中间结果,从而得到最终的计算结果

    2 构建抽象模型:Map和Reduce

    开发人员专注于实现Mapper和Reducer函数

    3.隐藏系统层细节

    开发人员专注于业务逻辑实现

    (三) MapReduce特点

    优点

    1.易于编程 MapReduce只需简单地实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机上运行 2.良好的可扩展性 当计算机资源得不到满足的时候,可以通过简单的增加机器来扩展它的计算能力 3.高容错性 比如一台机器挂了,可以把它上面的计算任务转移到另一个节点上运行,不至于整个任务运行失败,而且这个过程不需要人工干预, 完全由hadoop内部完成 4.高吞吐量 能对PB量级以上海量数据进行离线处理.适合离线处理而不适合实时处理,比如要求毫秒级返回一个结果,会很难做到

    不适用领域

    1.难以实时计算 2.不适合流式计算 3.DAG(有向图)计算

    (四)MapReduce实现WordCount

    (五)MapReduce执行过程

    1.数据定义格式

    map: (K1,V1) → list (K2,V2) reduce: (K2,list(V2)) → list (K3,V3)

    2.MapReduce执行过程

    Mapper Combiner Partitioner Shuffle and Sort Reducer

    (六)Key&Value类型

    1.必须可序列化(serializable hadoop实现的是Writable接口)

    作用:网络传输以及持久化存储IntWritable、LongWriteable、FloatWritable、Text、DoubleWritable, BooleanWritable、NullWritable等

    2.都继承了Writable接口

    并实现write()和readFields()方法

    3.Keys必须实现WritableComparable接口

    Reduce阶段需要sortkeys需要可比较

    二.MapReduce编程模型

    (一)InputFormat接口

    (二)Mapper类

    Mapper主要方法

    1.void setup(Context context) org.apache.hadoop.mapreduce.Mapper.Context 2.void map(KEY key, VALUE value, Context context) 为输入分片中的每个键/值对调用一次 3.void cleanup(Context context) 4.void run(Context context) 可通过重写该方法对Mapper进行更完整控制

    代码示例如下:

    public class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable> { Text k=new Text(); IntWritable v=new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.将文本转化为字符串String String line=value.toString(); //2.将字符串切割 String []words=line.split("\\s+"); //3.将每个单词循环写出 for (String word : words) { k.set(word); context.write(k,v); } } }

    (三)Combiner类

    1.Combiner相当于本地化的Reduce操作

    在shuffle之前进行本地聚合用于性能优化,可选项输入和输出类型一致

    2.Reducer可以被用作Combiner的条件

    符合交换律和结合律

    3.实现Combiner

    job.setCombinerClass(WCReducer.class)

    (四)Partitioner类 1.用于在Map端对key进行分区

    默认使用的是HashPartitioner 1)获取key的哈希值 2)使用key的哈希值对Reduce任务数求模决定每条记录应该送到哪个Reducer处理

    2.自定义Partitioner

    继承抽象类Partitioner,重写getPartition方法job.setPartitionerClass(MyPartitioner.class)

    (五)Reducer类

    Reducer主要方法如下:

    1.void setup(Context context)

    org.apache.hadoop.mapreduce.Reducer.Context

    2.void reduce(KEY key, Iterable values, Context context)

    为每个key调用一次

    3.void cleanup(Context context)

    4.void run(Context context)

    可通过重写该方法来控制reduce任务的工作方式

    (六)OutputFormat接口

    1.定义了如何将数据从Reducer进行输出

    RecordWriter<K,V> getRecordWriter 将Reducer的<key,value>写入到目标文件checkOutputSpecs 判断输出目录是否存在

    2.常用OutputFormat接口实现类

    TextOuputFormatSequenceFileOuputFormatMapFileOuputFormat

    (七)Job类执行任务,代码如下:

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.创建配置文件 Configuration conf=new Configuration(); //2.创建 Job job=Job.getInstance(conf,"wordcount"); //3.设置Map和Reduce位置 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); //4.设置Map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5.设置reduce输出的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6.设置输入输出的路径 // FileInputFormat.setInputPaths(job,new Path("file:///E:\\Program File (x86)\\IDEA\\hdfs\\data\\wcinput")); // FileOutputFormat.setOutputPath(job,new Path("file:///E:\\Program File (x86)\\IDEA\\hdfs\\data\\wcoutput")); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //7.提交程序运行 boolean result =job.waitForCompletion(true); System.exit(result? 0: 1); }

    (八)InputSplit(数据切片)

    1.找到你输入数据存储的目录 2.开始遍历目录下每一个文件 3.遍历第一个文件,wc.txt 4.获取文件大小 5.计算切片的大小,默认情况下切片大小会等于块大小 6.开始切片(假设260M文件,0-128M,128-256M,256M-260M,判断剩下的部分是否大于块的1.1倍,如果不大于1.1,就直接划分为一个切片) 切片和数据块区别?

    切片:MapReduce中的逻辑概念,一个切片就是一个Mapper任务 数据库Block:hdfs上的物理切割,是一个物理概念 通常情况下切片的个数等于数据块的个数

    定义了如何将数据读入Mapper

    InputSplit[] getSplits InputSplit表示由单个Mapper处理的数据 getSplits方法将一个大数据在逻辑上拆分为InputSplit RecordReader<K,V> getRecordReader 常用InputFormat接口实现类

    TextInputFormat FileInputFormat KeyValueInputForma

    Processed: 0.014, SQL: 9