MapReduce 是一个分布式计算框架,它将大型数据操作作业分解为可以跨服务器集群并行,适用于大量数据的处理,每个job包含了 map 和 reduce 两部分。
工作原理: (附一张图,这里说的会比较清楚)
附带词频统计的代码:
// 1、首先建一个 map 类用于文件的分割 //map 是通过 键和值 来进行分割操作 //LongWritable 键的输入类型 Text 值的输入类型 Text 键的输出类型 IntWritable 值的输出类型 public class WCMapper extends Mapper<LongWritable, Text,Text,IntWritable> { //创建 K、V的输出对象 Text k = new Text(); IntWritable v = new IntWritable(1); //(创建一个值 用于统计每个单词出现的个数 我这选取1 作为统计的标准) //重写map 方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将文本转化为string 类型并输出 String line = value.toString(); //建一个string 类型的数组 接收 输出的内容 并且以 空格 为分隔符 进行分割 String [] words = line.split("\\s+"); //将每一个分割出来的字词 作为map 的key for (String word : words) { k.set(word); context.write(k,v); } } } //其次 创建一个reduce 类 进行值的个数的统计 public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ //创建一个sum 用于统计个数 int sum = 0; // 创建输出的v对象 IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 通过迭代器进行遍历 values 的值 for(IntWritable count : values){ // 对迭代器进行累加求和 类似于某个单词出现一次就会自动累加一次 sum = sum + count.get(); } // 将key和value进行写出 v.set(sum); context.write(key,v); } } //创建驱动类 用于串联之前的map 和 reduce 类 public class WCDriver { public static void main(String[] args) throws Exception { //1,创建及配置文件 Configuration conf=new Configuration(); Job job=Job.getInstance(conf,"wordcount"); //2,设置jar的位置 job.setJarByClass(WCDriver.class); //3,设置map和reduce 的位置 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); //4,设置map的输出 key、value 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5,设置reduce的输出key、value 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置reduce 的输出格式 job.setNumReduceTasks(3); //6,设置输入输出路径 //内容文件路径 FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\hadoop1.txt")); //输出文件路径 hadoop 文件夹不能存在 会报错 FileOutputFormat.setOutputPath(job,new Path("D:\\hbase\\hadoop")); //7.提交程序运行 boolean result =job.waitForCompletion(true); System.exit(result ? 0:1); } }