综合性HDFS实战——HDFS文件词频统计

    技术2026-04-02  6

    github:https://github.com/skyerhxx/HDFS_WordCount

     

    使用HDFS Java API完成HDFS文件系统上的文件的词频统计

    词频统计:wordcount

    将统计完的结果输出到HDFS上去

     

    用mapreduce或者spark操作很简单,但是我们这里的要求是只允许使用HDFS API进行操作

     

    功能拆解

    ①读取HDFS上的文件 ==> HDFS API

    ②词频统计 ==> 对文件中的每一行数据进行 按照分隔符分割 ==> Mapper

    ③将处理结果缓存起来 ==> Context

    ④将结果输出到HDFS ==> HDFS API

     

    代码框架编写

    HDFS_WordCount1.java

    package com.imooc.bigdata.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.Set; /** * 使用HDFS API完成wordcount统计 * */ public class HDFS_WordCount1 { public static void main(String[] args) throws Exception{ // 1)读取HDFS上的文件 ==> HDFS API Path input = new Path("/hdfsapi/test/hello.txt"); //获取要操作的HDFS文件系统 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop000:8020"),new Configuration(),"hadoop"); RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input,false); while(iterator.hasNext()){ LocatedFileStatus file = iterator.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line = ""; while((line = reader.readLine()) != null){ // 2)词频处理 } reader.close(); in.close(); } // 3)将结果缓存起来 Map Map<Object, Object> contextMap = new HashMap<Object, Object>(); // 4)将结果输出到HDFS ==> HDFS API Path output = new Path("/hdfsapi/output/"); FSDataOutputStream out = fs.create(new Path(output,new Path("wc.out"))); //将第三步缓存中的内容输出到out中去 Set<Map.Entry<Object, Object>> entries = contextMap.entrySet(); for(Map.Entry<Object,Object> entry:entries){ out.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } out.close(); fs.close(); System.out.println("hxx的HDFS API统计词频运行成功......"); } }

     

    自定义上下文

    HDFS_WordCount.java

    package com.imooc.bigdata.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.Set; /** * 使用HDFS API完成wordcount统计 * */ public class HDFS_WordCount1 { public static void main(String[] args) throws Exception{ // 1)读取HDFS上的文件 ==> HDFS API Path input = new Path("/hdfsapi/test/hello.txt"); //获取要操作的HDFS文件系统 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop000:8020"),new Configuration(),"hadoop"); RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input,false); while(iterator.hasNext()){ LocatedFileStatus file = iterator.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line = ""; while((line = reader.readLine()) != null){ // 2)词频处理 //将结果写到Cache中去 } reader.close(); in.close(); } // 3)将结果缓存起来 Map HxxContext context = new HxxContext(); Map<Object, Object> contextMap = context.getCacheMap(); // 4)将结果输出到HDFS ==> HDFS API Path output = new Path("/hdfsapi/output/"); FSDataOutputStream out = fs.create(new Path(output,new Path("wc.out"))); //将第三步缓存中的内容输出到out中去 Set<Map.Entry<Object, Object>> entries = contextMap.entrySet(); for(Map.Entry<Object,Object> entry:entries){ out.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } out.close(); fs.close(); System.out.println("hxx的HDFS API统计词频运行成功......"); } }

    HxxContext.java

    package com.imooc.bigdata.hadoop.hdfs; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; /** * 自定义上下文,其实就是缓存 */ public class HxxContext { private Map<Object,Object> cacheMap = new HashMap<Object, Object>(); public Map<Object,Object> getCacheMap(){ return cacheMap; } //写数据到缓存中去 //key是单词 value是次数 public void write(Object key, Object value){ cacheMap.put(key,value); } //从缓存中获取值 //key 单词 return 单词对应的词频 public Object get(Object key){ return cacheMap.get(key); } }

     

    自定义处理类+功能实现

    现在已经实现了词频统计的功能了

     

    使用自定义配置文件重构代码

    我们现在的编码都是硬编码,要统计的文件的路径、URI地址都是自己写到程序里的

    所以我们要修改一下,改成写在配置文件里 可配置的

    新建了resources和wc.properties文件

    改写了HDFS_WordCount1,保存为HDFS_WordCount2

    package com.imooc.bigdata.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.Map; import java.util.Properties; import java.util.Set; /** * 使用HDFS API完成wordcount统计 * */ public class HDFS_WordCount2 { public static void main(String[] args) throws Exception{ // 1)读取HDFS上的文件 ==> HDFS API Properties properties = ParamsUtils.getProperties(); Path input = new Path(properties.getProperty(Constants.INPUT_PATH)); //获取要操作的HDFS文件系统 FileSystem fs = FileSystem.get(new URI(properties.getProperty(Constants.HDFS_URI)),new Configuration(),"hadoop"); RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input,false); HxxMapper mapper = new WordCountMapper(); HxxContext context = new HxxContext(); while(iterator.hasNext()){ LocatedFileStatus file = iterator.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line = ""; while((line = reader.readLine()) != null){ // 2)词频处理 //将结果写到Cache中去 mapper.map(line, context); } reader.close(); in.close(); } // 3)将结果缓存起来 Map Map<Object, Object> contextMap = context.getCacheMap(); // 4)将结果输出到HDFS ==> HDFS API Path output = new Path(properties.getProperty(Constants.OUTPUT_PATH)); FSDataOutputStream out = fs.create(new Path(output,new Path(properties.getProperty(Constants.OUTPUT_FILE)))); //将第三步缓存中的内容输出到out中去 Set<Map.Entry<Object, Object>> entries = contextMap.entrySet(); for(Map.Entry<Object,Object> entry:entries){ out.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } out.close(); fs.close(); System.out.println("hxx的HDFS API统计词频运行成功......"); } }

     

    使用反射创建自定义Mapper对象

    package com.imooc.bigdata.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.Map; import java.util.Properties; import java.util.Set; /** * 使用HDFS API完成wordcount统计 * */ public class HDFS_WordCount2 { public static void main(String[] args) throws Exception{ // 1)读取HDFS上的文件 ==> HDFS API Properties properties = ParamsUtils.getProperties(); Path input = new Path(properties.getProperty(Constants.INPUT_PATH)); //获取要操作的HDFS文件系统 FileSystem fs = FileSystem.get(new URI(properties.getProperty(Constants.HDFS_URI)),new Configuration(),"hadoop"); RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input,false); //通过反射创建对象 Class<?> clazz = Class.forName(properties.getProperty(Constants.MAPPER_CLASS)); clazz.newInstance(); HxxMapper mapper = (HxxMapper)clazz.newInstance(); //HxxMapper mapper = new WordCountMapper(); HxxContext context = new HxxContext(); while(iterator.hasNext()){ LocatedFileStatus file = iterator.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line = ""; while((line = reader.readLine()) != null){ // 2)词频处理 //将结果写到Cache中去 mapper.map(line, context); } reader.close(); in.close(); } // 3)将结果缓存起来 Map Map<Object, Object> contextMap = context.getCacheMap(); // 4)将结果输出到HDFS ==> HDFS API Path output = new Path(properties.getProperty(Constants.OUTPUT_PATH)); FSDataOutputStream out = fs.create(new Path(output,new Path(properties.getProperty(Constants.OUTPUT_FILE)))); //将第三步缓存中的内容输出到out中去 Set<Map.Entry<Object, Object>> entries = contextMap.entrySet(); for(Map.Entry<Object,Object> entry:entries){ out.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } out.close(); fs.close(); System.out.println("hxx的HDFS API统计词频运行成功......"); } }

     

    可插拔业务逻辑处理

    如果我要把代码改成大小写兼容的呢

    就改这两个地方就行了

    所以,框加设计的合理的话,对于我们使用起来是非常方便的

    这就是一个所谓的可插拔开发/管理方式  plug in

    随时可以替换

     

     

     

    Processed: 0.011, SQL: 9