MapReduce编程-统计词频

    技术2022-07-11  82

    MapReduce编程-统计词频

    统计词频

    统计词频

    Map

    /** * KEYIN:输入的key类型 * VALUEIN:输入的value类型 * KEYOUT:输出的key类型 * VALUEOUT:输出的value类型 */ public class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable> { //创建输出的k,v对象 Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1、将文本转化成string String line = value.toString(); //2、将字符串切割 String[] words = line.split("\\s+"); //3、将每一个单词循环写出 for (String word : words) { k.set(word); context.write(k,v); } } }

    Reducer

    /** * KEYIN:reduce段输入key类型,即map输出的key类型 * VALUEIN:reduce段输入value类型,即map输出的value类型 * KEYOUT:reduce输出的key类型 * VALUEOUT:reduce输出的value类型 */ public class WCReducer extends Reducer<Text,IntWritable, Text, IntWritable> { int sum=0; //创建输出v对象 IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //reduce端接受到的类型大概是这样 (wish,(1,1,1, 1,1,1,1)) //遍历迭代器 for (IntWritable count : values) { //对迭代器进行累加求和 sum+=count.get(); } //将key和value进行写出 v.set(sum); context.write(key,v); } }

    Driver

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1、创建配置文件 Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"wordcount"); //2、设置jar位置 job.setJarByClass(WCDriver.class); //3、设置Map和Reducer的位置 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); //4、设置Map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5、设置reduce输出的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6、设置输入输出路径 // FileInputFormat.setInputPaths(job,new Path(args[0])); // FileOutputFormat.setOutputPath(job,new Path(args[1])); FileInputFormat.setInputPaths(job,new Path("file:///D:\\softs\\ideaproject\\boke\\data\\ceshiinput")); FileOutputFormat.setOutputPath(job,new Path("file:///D:\\softs\\ideaproject\\boke\\data\\ceshionput")); //7、提交程序运行 boolean result = job.waitForCompletion(true); System.exit(result?0:1); }
    Processed: 0.021, SQL: 9