MapReduce原理及初步编程

    技术2022-07-11  80

    什么是MapReduce

    MapReduce是一个分布式计算框架:将大型数据操作作业分解为可以跨服务器集群并行执行的单个任务。 起源于Google;适用于大规模数据处理场景。每个节点处理存储在该节点的数据。每个job包含map和reduce两部分。

    MapReduce的设计思想

    分而治之 简化并行计算的编程模型 构建抽象模型:Map和Reduce 开发人员专注于实现Mapper和Reducer函数 隐藏系统层细节,开发人员专注于业务逻辑实现。

    MapReduce特点

    优点:易于编程,可扩展性,高容错性,高吞吐量 不适用领域:难以实时计算,不适合流式计算

    MapReduce执行过程

    Shuffle描述着数据从map task输出到reduce task输入的这段过程。 在Hadoop这样的集群环境中,大部分map task与reduce task的执行是在不同的节点上,当然很多情况下Reduce执行时需要跨节点去拉取其它节点上的map task结果,如果集群正在运行的job有很多,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的就是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘IO对job完成时间的影响也是可观的。从最基本的要求来说,我们对Shuffle过程的期望可以有: 完整地从map task端拉取数据到reduce端。 在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。 减少磁盘IO对task执行的影响。 优化的地方主要在于减少拉取数据的量及尽量使用内存而不是磁盘。 数据定义格式 map: (K1,V1) → list (K2,V2) reduce: (K2,list(V2)) → list (K3,V3) MapReduce执行过程:Mapper、Combiner、Partitioner、Shuffle and Sort、Reducer。

    Key&Value类型

    必须可序列化(serializable) 作用:网络传输以及持久化存储 IntWritable、LongWriteable、FloatWritable、Text、DoubleWritable, BooleanWritable、NullWritable等 都继承了Writable接口 并实现write()和readFields()方法 Keys必须实现WritableComparable接口 Reduce阶段需要sort keys需要可比较

    代码实例

    Mapper类

    package kb07.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * keyin: * valuein: * keyout: * valueout: * */ public class WCMapper extends Mapper<LongWritable,Text,Text, IntWritable> { Text k=new Text(); IntWritable v=new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将文本转换为string String line= value.toString(); //将字符串切割 String[] words=line.split("\\s+"); //将每个单词写出去 for (String word : words) { k.set(word); context.write(k,v); } } }

    Reducer类

    package kb07.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** *keyin:reduce输入的key类型,即map端输出的key类型 * valuein: * */ public class WCReducer extends Reducer<Text, IntWritable,Text,IntWritable> { int sum; IntWritable v=new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //reduce端接受到的类型,大概是 wish,(1,1,1,1,1,1) sum=0; //遍历迭代器 for (IntWritable count : values) { //对迭代器进行累加求和 sum+=count.get(); } //将key和value进行写出 v.set(sum); context.write(key,v); } }

    Driver类

    package kb07.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WCDriver { public static void main(String[] args) throws Exception{ //创建配置文件 Configuration conf=new Configuration(); Job job=Job.getInstance(conf,"wordcount"); //设置jar的位置 job.setJarByClass(WCDriver.class); //设置map和reduce的位置 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); //设置map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置reduce输出的key,value类型 job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置输入输出路径 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交执行 boolean result=job.waitForCompletion(true); System.exit(result?0:1); } }
    Processed: 0.015, SQL: 9