分布式计算模型 MapReduce一WordCount 案例

    技术2022-07-11  74

    MapReduce实现WordCount

    编写Mapper类

    /** * KEYIN : 输入的key类型 * VALUEIN : 输入的value类型 * KEYOUT : 输出的key类型 * VALUEOUT : 输出的value类型 */ public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> { // 创建输出的k,v对象 Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.将文本转化成string String line = value.toString(); // 2.将字符串切割 String[] words = line.split("\\s+"); // 3.将每一个单词写出 for (String word : words) { k.set(word); context.write(k,v); } } }

    编写Reducer类

    /** * KEYIN : reduce端输入的key类型,即map端输出的key类型 * VALUEIN : reduce端输入的value类型,即map端输出的value类型 * KEYOUT : reduce端输出的key类型 * VALUEOUT : reducer输出的value类型 */ public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable> { int sum; // 创建输出的v对象 IntWritable v =new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // reduce端接受到的类型大概是这样的 (wish,(1,1,1,1,1,1,1)) sum = 0; // 遍历迭代器 for (IntWritable count : values) { // 对迭代器进行累加求和 sum+=count.get(); } // 将key和value进行写出 v.set(sum); context.write(key,v); } }

    编写DRIVER类

    public class WCDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1.创建配置文件,创建Job Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "wordcount"); // 2.设置jar的位置 job.setJarByClass(WCDriver.class); // 3.设置Map和Reduce的位置 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); // 4.设置map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5.设置reduce输出的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6.设置输入输出路径 FileInputFormat.setInputPaths(job,new Path("file:///D:\\Users\\IdeaProjects\\hadoop02\\data\\wcinput")); FileOutputFormat.setOutputPath(job,new Path("file:///D:\\Users\\IdeaProjects\\hadoop02\\data\\wcoutput")); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7.提交程序运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
    Processed: 0.015, SQL: 9