1、需要的maven依赖包
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency>2、创建并编写WCMapper
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** *KEYIN:输入的key类型 * VALUEIN:输入的value类型 * KEYOUT:输出的key类型 * VALUEOUT:输出的value类型 */ public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> { Text k=new Text(); IntWritable v=new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1、将文本转化成string String line=value.toString(); //2、将字符切割 String[] words=line.split("\\s+"); //3、将每一个单词写出 for (String word : words) { k.set(word); context.write(k,v); } } }3、创建并编写WCReducer
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** *KEYIN:reduce端输入的key类型,即map端输出的key类型 * VALUEIN:reduce端输入的value类型,即map端输出的value类型 * KEYOUT:reduce端输出的key类型 * VALUEOUT:reduce端输出的value类型 */ public class WCReducer extends Reducer<Text, IntWritable,Text,IntWritable>{ int sum; IntWritable v=new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //reduce端接收到的类型大概是这样的 (wish,(1,1,1,1,1,1,1)) //对迭代器进行累加求和 sum=0; for (IntWritable count : values) { //对迭代器进行累加求和 sum+=count.get(); } //将key和value进行写出 v.set(sum); context.write(key,v); } }4、创建并编写WCDriver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; public class WCDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1、创建配置文件 Configuration conf=new Configuration(); Job job=Job.getInstance(conf,"wordcount"); //2、设置jar的位置 job.setJarByClass(WCDriver.class); //3、设置Map和Reduce的位置、 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); //4、设置map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5、设置reduce输出的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6、设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //7、提交程序运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0:1); } }下面以一个简单的流程图来了解,WordCount词频统计中MapReduce的执行流程 我用红色的框把图分成了4个部分: 1、Splitting部分 把要读取的文件分成几个部分,分到不同的机器上实现并行处理。 需要注意的是,这里需要经过切片处理,就是保证不能把一个单词分成两个部分。切片是Mapper中的逻辑概念,切片是Mapper任务。
2、Map和Combine部分 从Splitting部分接收到文件的一部分,Map将字符切割后得到了一些键值对<key,value>,key就是每一个单词,value就是根据单词的出现频次得到的:(1,1,1,1,1…)出现多少次就有多少个1。 然后再由Combine进行局部的统计后,又得到了一些键值对<key,value>,这里的value就是Map中的1相加的结果。 需要注意的是,这一部分也是在不同的机器上完成的,所以得到的只是局部的数据。到这里Mapper的任务就基本完成了。
3、Shuffle/Sort部分 这里是Mapper和Reducer的中间部分,它的每一个处理步骤都分散在map task和reduce task节点上,整体看来,就是对Map的结果进行分区、排序、分割。也就是对Combine中的value进行分区,排序后合并。
4、Reduce部分 接受经过Shuffle后得到的<key,value>,把各value的值相加,得到最后每一个单词的出现次数。
以上就是我一个hadoop初学者对于MapReduce的执行流程的简单理解,如果有说的不对的地方,还望大佬们不吝赐教。