MapReduce实现改进版WordCount词频统计

    技术2022-07-11  88

    新手入门MapReduce实现改进版WordCount词频统计

    一、实验任务要求

    本实验是为了实现改进版的词频统计WordCount。要求根据所给的英文名著数据集和停用词表,统计英文名著数据集中词频,过滤掉停用词,将统计结果按照词频降序排序,词频相同的词按照字典序排序。

    英文名著数据集和停用词表再此处(提取码:ceij)

    二、实验工具和环境配置说明

    电脑安装了Vmware软件,搭建Centos7系统环境,配置了Hadoop2.7.4单机伪分布环境并安装eclipse编程软件,使用Java语言完成实验任务。

    三、步骤

    首先我们对实验任务进行划分,将实验任务划分为两个子任务:词频统计和排序,针对两个子任务job分别进行设计。

    1.词频统计

    (1)首先,我们在这个任务阶段需要英文名著数据集和停用词表,统计是针对英文名著数据集,但要过滤掉停用词,所以map阶段要判断接收到的词是不是停用词,不是则输出。这就要保证每个map都能使用停用词表,所以,就要使用map中的Setup把这个停用词表变成一个全局变量; 在main函数里的第一个job中加入缓存文件停用词表stopwords.txt。 在Mapper中的设置一个空列表li,在Mapper中的setup阶段读缓存文件,将其中的每一个停用词加入到列表中。 (2)获得了全局变量停用词列表后,接下来就要获得文章的每个词,判断词如果不是停用词,输出<key=词,value=词频1>。 将英文名著按行读取,使用空格等标点符号分割成一个个词,将词化为小写,如果词不在停用词列表中,输出。 (3)Reducer收到词,将key即相同的词merge合并累计词频。

    2.排序

    (1)首先,我们排序是对任务一的词频文件进行排序,由任务一job1,我们已经获得了统计好的词频文件存储在test文件夹中,将词频统计job1的输出路径作为排序job2的输入路径;

    (2)然而我们排序不是简单的按词的字典序排序,也不是按照词频排序,而是将二者综合在一起排序,所以我们要自定义排序方式,由于MapReduce是根据key来排序,所以我们要自定义key的数据类型从而自定义排序方式。 我自定义了一个数据类型myclass,其中两个变量x代表词频,y代表词。 排序方式定义为按词频降序,词频相同按照词的字典序排序。

    (3)Mapper中读取词和词频,构造输出<key,value>为类型为myclass,IntWritable。 (4)Reducer把输出的key再改回词。

    实验结果与展示:

    所有代码如下:

    import java.io.BufferedReader; import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.nio.file.FileStore; import java.nio.file.PathMatcher; import java.nio.file.WatchService; import java.nio.file.attribute.UserPrincipalLookupService; import java.nio.file.spi.FileSystemProvider; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); static List<String> li = new ArrayList<String>(); @Override protected void setup(Context context)throws IOException, InterruptedException {//获取缓存文件路径的数组 Path [] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); System.out.println(paths); BufferedReader sb = new BufferedReader(new FileReader(paths[0].toUri().getPath())); //读取BufferedReader里面的数据 String tmp = null; while ( (tmp = sb.readLine()) != null) { String ss []= tmp.split(" "); for (String s : ss) { li.add(s); } } //关闭sb对象 sb.close(); System.out.println("+++++++"+li); } public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()," \'\",!.?(){}-:;@#&*/[]`s<>_"); while (itr.hasMoreTokens()) { String tmpword = itr.nextToken(); if(!li.contains(tmpword.toLowerCase())){ word.set(tmpword.toLowerCase()); context.write(word, one); } } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static class myclass implements WritableComparable<myclass> { public int x; public String y; public int getX() { return x; } public String getY() { return y; } public void readFields(DataInput in) throws IOException { x = in.readInt(); y = in.readUTF(); } public void write(DataOutput out) throws IOException { out.writeInt(x); out.writeUTF(y); } public int compareTo(myclass p) { if (this.x > p.x) { return -1; } else if (this.x < p.x) { return 1; } else { if (this.getY().compareTo(p.getY()) < 0) { return -1; } else if (this.getY().compareTo(p.getY()) > 0) { return 1; } else { return 0; } } } } public static class Mysorter extends IntWritable.Comparator { public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } public static class TokenizerMapper1 extends Mapper<Object, Text, myclass, IntWritable>{ //private Text keyInfo = new Text(); private IntWritable valueInfo = new IntWritable(); @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer( value.toString()); String[] word = value.toString().split("\\s+"); myclass keyInfo = new myclass(); keyInfo.x=Integer.parseInt(word[word.length-1]); keyInfo.y=word[word.length-2]; //keyInfo.set( word[0]); valueInfo.set(Integer.parseInt(word[word.length-1])); context.write(keyInfo, valueInfo); } } public static class IntSumReducer1 extends Reducer<myclass,IntWritable,Text,IntWritable> { private IntWritable valueInfo = new IntWritable(); private Text keyInfo = new Text(); public void reduce(myclass key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { keyInfo.set(key.y); valueInfo.set(key.x); context.write(keyInfo,valueInfo); } } public static void main(String[] args) throws Exception { //任务一 Configuration conf = new Configuration(); //FileSystem hdfs= FileSystem.get(conf); //conf.set("stop", "hdfs://localhost:9000/input/stopwords.txt"); //DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/discache"), conf); //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); String[] otherArgs = new String[]{"hdfs://localhost:9000/input/index/data", "hdfs://localhost:9000/output/test"}; if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job1 = Job.getInstance(conf, "word count"); job1.setJarByClass(WordCount.class); //设置分布式缓存文件 job1.addCacheFile(new URI("hdfs://localhost:9000/input/stopwords/stopwords.txt")); job1.setMapperClass(TokenizerMapper.class); job1.setCombinerClass(IntSumReducer.class); job1.setReducerClass(IntSumReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job1, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job1, new Path(otherArgs[otherArgs.length - 1])); job1.waitForCompletion(true); //任务2 Configuration conf1 = new Configuration(); Job job2 = Job.getInstance(conf1, "sort"); job2.setMapperClass(TokenizerMapper1.class); job2.setReducerClass(IntSumReducer1.class); job2.setMapOutputKeyClass(myclass.class); job2.setMapOutputValueClass(IntWritable.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); //job1.setSortComparatorClass(Mysorter.class); FileInputFormat.addInputPath(job2, new Path(otherArgs[otherArgs.length - 1])); FileOutputFormat.setOutputPath(job2, new Path("hdfs://localhost:9000/output/sort")); System.exit(job2.waitForCompletion(true) ? 0 : 1); } }
    Processed: 0.017, SQL: 9