github:https://github.com/skyerhxx/HDFS_WordCount
词频统计:wordcount
将统计完的结果输出到HDFS上去
用mapreduce或者spark操作很简单,但是我们这里的要求是只允许使用HDFS API进行操作
①读取HDFS上的文件 ==> HDFS API
②词频统计 ==> 对文件中的每一行数据进行 按照分隔符分割 ==> Mapper
③将处理结果缓存起来 ==> Context
④将结果输出到HDFS ==> HDFS API
HDFS_WordCount1.java
package com.imooc.bigdata.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.Set; /** * 使用HDFS API完成wordcount统计 * */ public class HDFS_WordCount1 { public static void main(String[] args) throws Exception{ // 1)读取HDFS上的文件 ==> HDFS API Path input = new Path("/hdfsapi/test/hello.txt"); //获取要操作的HDFS文件系统 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop000:8020"),new Configuration(),"hadoop"); RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input,false); while(iterator.hasNext()){ LocatedFileStatus file = iterator.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line = ""; while((line = reader.readLine()) != null){ // 2)词频处理 } reader.close(); in.close(); } // 3)将结果缓存起来 Map Map<Object, Object> contextMap = new HashMap<Object, Object>(); // 4)将结果输出到HDFS ==> HDFS API Path output = new Path("/hdfsapi/output/"); FSDataOutputStream out = fs.create(new Path(output,new Path("wc.out"))); //将第三步缓存中的内容输出到out中去 Set<Map.Entry<Object, Object>> entries = contextMap.entrySet(); for(Map.Entry<Object,Object> entry:entries){ out.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } out.close(); fs.close(); System.out.println("hxx的HDFS API统计词频运行成功......"); } }
HDFS_WordCount.java
package com.imooc.bigdata.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.Set; /** * 使用HDFS API完成wordcount统计 * */ public class HDFS_WordCount1 { public static void main(String[] args) throws Exception{ // 1)读取HDFS上的文件 ==> HDFS API Path input = new Path("/hdfsapi/test/hello.txt"); //获取要操作的HDFS文件系统 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop000:8020"),new Configuration(),"hadoop"); RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input,false); while(iterator.hasNext()){ LocatedFileStatus file = iterator.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line = ""; while((line = reader.readLine()) != null){ // 2)词频处理 //将结果写到Cache中去 } reader.close(); in.close(); } // 3)将结果缓存起来 Map HxxContext context = new HxxContext(); Map<Object, Object> contextMap = context.getCacheMap(); // 4)将结果输出到HDFS ==> HDFS API Path output = new Path("/hdfsapi/output/"); FSDataOutputStream out = fs.create(new Path(output,new Path("wc.out"))); //将第三步缓存中的内容输出到out中去 Set<Map.Entry<Object, Object>> entries = contextMap.entrySet(); for(Map.Entry<Object,Object> entry:entries){ out.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } out.close(); fs.close(); System.out.println("hxx的HDFS API统计词频运行成功......"); } }HxxContext.java
package com.imooc.bigdata.hadoop.hdfs; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; /** * 自定义上下文,其实就是缓存 */ public class HxxContext { private Map<Object,Object> cacheMap = new HashMap<Object, Object>(); public Map<Object,Object> getCacheMap(){ return cacheMap; } //写数据到缓存中去 //key是单词 value是次数 public void write(Object key, Object value){ cacheMap.put(key,value); } //从缓存中获取值 //key 单词 return 单词对应的词频 public Object get(Object key){ return cacheMap.get(key); } }
现在已经实现了词频统计的功能了
我们现在的编码都是硬编码,要统计的文件的路径、URI地址都是自己写到程序里的
所以我们要修改一下,改成写在配置文件里 可配置的
新建了resources和wc.properties文件
改写了HDFS_WordCount1,保存为HDFS_WordCount2
package com.imooc.bigdata.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.Map; import java.util.Properties; import java.util.Set; /** * 使用HDFS API完成wordcount统计 * */ public class HDFS_WordCount2 { public static void main(String[] args) throws Exception{ // 1)读取HDFS上的文件 ==> HDFS API Properties properties = ParamsUtils.getProperties(); Path input = new Path(properties.getProperty(Constants.INPUT_PATH)); //获取要操作的HDFS文件系统 FileSystem fs = FileSystem.get(new URI(properties.getProperty(Constants.HDFS_URI)),new Configuration(),"hadoop"); RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input,false); HxxMapper mapper = new WordCountMapper(); HxxContext context = new HxxContext(); while(iterator.hasNext()){ LocatedFileStatus file = iterator.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line = ""; while((line = reader.readLine()) != null){ // 2)词频处理 //将结果写到Cache中去 mapper.map(line, context); } reader.close(); in.close(); } // 3)将结果缓存起来 Map Map<Object, Object> contextMap = context.getCacheMap(); // 4)将结果输出到HDFS ==> HDFS API Path output = new Path(properties.getProperty(Constants.OUTPUT_PATH)); FSDataOutputStream out = fs.create(new Path(output,new Path(properties.getProperty(Constants.OUTPUT_FILE)))); //将第三步缓存中的内容输出到out中去 Set<Map.Entry<Object, Object>> entries = contextMap.entrySet(); for(Map.Entry<Object,Object> entry:entries){ out.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } out.close(); fs.close(); System.out.println("hxx的HDFS API统计词频运行成功......"); } }
如果我要把代码改成大小写兼容的呢
就改这两个地方就行了
所以,框加设计的合理的话,对于我们使用起来是非常方便的
这就是一个所谓的可插拔开发/管理方式 plug in
随时可以替换
