菜鸟先飞之MapReduce(包含词频统计等实例及我踩过的惊雷)

    技术2022-07-12  86

    MapReduce是一个分布式计算框架

    起源于Google

    它将大型数据操作作业分解为可以跨服务器集群并执行单个任务

    适用于大规模数据处理场景

        每个节点处理存储在该节点的数据

        每个job包含Map和Reduce两部分

     

    分而治之

    简化并行计算的编程模型

    构建抽象模型:Map和Reduce

    开发人员专注于实现Mapper 和Reducer 函数

    隐藏系统层细节

     

    优点:

    易于编程:实现接口就能完成一个分布式计算框架

    可扩展性强

    高容错性

    高吞吐量:处理的数据量

     

    不适用领域

    难以实时计算 :Mapper reduce 是批量处理 无法实时处理(离计算)

    不适合流式计算:只能处理静态数据

    不适合DAG(有向图)计算

     

    key和value类型

    1、必须可序列化(serializable)

       作用:网络传输以及持久化存储

       IntWritable、LongWriteable、FloatWritable、Text、DoubleWritable,BooleanWritable、NullWritable等

    2、都继承了Writable接口

        并实现write()和readFields()方法

    3、Keys必须实现WritableComparable接口

        Reduce阶段需要sort

        keys需要可比较

     

    Mapreduce编程模型

    input 输入

    map 接收键和值

    combine 进行优化

    partitioner 进行分区

    reducer 接收输入 转为输出

    output 输出

     

    mapreduce 工作过程

     

    Mapreduce 实现Wordcount(词频统计编程)

    1、创建maven工程 添加pom文件依赖关系

    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>

        <groupId>Mapreduce</groupId>     <artifactId>cn.kgc</artifactId>     <version>1.0-SNAPSHOT</version>

        <properties>         <maven.compiler.source>1.8</maven.compiler.source>         <maven.compiler.target>1.8</maven.compiler.target>         <hadoop.version>2.6.0</hadoop.version>     </properties>          <dependencies>         <!--MapReduce-->         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-auth</artifactId>             <version>2.6.0</version>         </dependency>         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-mapreduce-client-core</artifactId>             <version>2.6.0</version>         </dependency>         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-mapreduce-client-jobclient</artifactId>             <version>2.6.0</version>         </dependency>         <dependency>             <groupId>junit</groupId>             <artifactId>junit</artifactId>             <version>4.12</version>             <scope>test</scope>         </dependency>

            <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-common</artifactId>             <version>2.6.0</version>         </dependency>         <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-client</artifactId>             <version>2.6.0</version>         </dependency>         <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-hdfs</artifactId>             <version>2.6.0</version>         </dependency>         <dependency>             <groupId>commons-logging</groupId>             <artifactId>commons-logging</artifactId>             <version>1.2</version>         </dependency>         <dependency>             <groupId>log4j</groupId>             <artifactId>log4j</artifactId>             <version>1.2.17</version>         </dependency>         <dependency>             <groupId>junit</groupId>             <artifactId>junit</artifactId>             <version>4.12</version>             <scope>test</scope>         </dependency>         <dependency>             <groupId>org.junit.jupiter</groupId>             <artifactId>junit-jupiter-api</artifactId>             <version>RELEASE</version>             <scope>compile</scope>         </dependency>         <dependency>             <groupId>junit</groupId>             <artifactId>junit</artifactId>             <version>4.12</version>             <scope>compile</scope>         </dependency>     </dependencies> </project>

    2、新建class类编写wordMapper

    import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

    //继承Mapper,重写map方法 //ps:所有导入的包都是hadoop的 //四个参数分别是 KEYIN 输入的key,  // VALUEIN输入的value,  // KEYOUT输出的key 类型,  // VALUEOUT输出的value类型 public class WCMapper extends Mapper<LongWritable, Text, Text,IntWritable> {     Text k = new Text();     IntWritable v = new IntWritable(1);

        @Override     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         //将文本转换成String         String line = value.toString();

            //将字符串切割         String[] words = line.split("\\s+");

            //将每一个单词循环写出去         for (String word : words) {             k.set(word);             context.write(k,v);         }     } }

    3、 新建class类编写wordReducer类

    import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException; /**  * 继承Reducer 重写 reduce 方法   * KEYIN : reduce端输入的key类型,即map端输出的key类型  * VALUEIN : reduce端输入的value类型,即map端输出的value类型  * KEYOUT : reduce端输出的key类型  * VALUEOUT : reducer输出的value类型  */public class WCReducer extends Reducer<Text, IntWritable,Text,IntWritable> {   //创建输出的v对象      IntWritable v = new IntWritable();   int sum = 0;

        @Override     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {         //reduce 端接受到的类型大概是这样的 (wish,(1,1,1,1,1,1))         //对迭代器进行累加求和         for (IntWritable count : values) {             sum += count.get();         }         //将key value 进行写出         v.set(sum);         context.write(key,v);     } }

     

    3、 编写驱动类

    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WCDirver {     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {         //创建配置文件         Configuration conf = new Configuration();         Job job = Job.getInstance(conf, "wordcount");         job.getConfiguration().setStrings("mapreduce.reduce.shuffle.memory.limit.percent", "0.15");                  //设置jar的位置         job.setJarByClass(WCDirver.class);                  //设置map和reduce的位置         job.setMapperClass(WCMapper.class);         job.setReducerClass(WCReducer.class);

            //设置map输出的key,value类型         job.setMapOutputKeyClass(Text.class);         job.setMapOutputValueClass(IntWritable.class);

            //设置key输出的key,value类型         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(IntWritable.class);

            //设置输入输出的路径         // 新建input 文件夹,新建文本文件 里面随便写点单词         // 输入路径指向要统计的文件的路径          FileInputFormat.setInputPaths(job,new Path("file:///D:\\Idea\\Mapreduce\\input"));         //FileInputFormat.setInputPaths(job,new Path(args[0]));                 FileOutputFormat.setOutputPath(job,new Path("file:///D:\\Idea\\Mapreduce\\output"));         // FileOutputFormat.setOutputPath(job,new Path(args[1]));                  //提交程序运行         boolean result = job.waitForCompletion(true);         System.exit(result? 0 : 1);     } }

     4、运行驱动类

    5、我踩过的l坑

    1)版本不匹配的问题

    此目录下的hadoop.dll 与解压到本地的 hadoop/lib文件夹下的hadoop.sh不是同一个文件

    2)本地用户 用户名中包含空格则会抛出两个异常

    shuffle error in local#1....

    Caused by: java.io.FileNotFoundException: D:/tmp/hadoop....

    经过多番周折才获知Hadoop运行过程中的文件路径中不能出现空格键。根据电脑中的当前用户名产生的,所以我新建了一个不带空格的本地用户,问题成功解决(千万不要修改本地用户文件夹的名称,试过的我离当场去世只差一步,新建用户是最好的办法)。

    3)若是代码相同,而代码飘红则是jar包导错了,检查导入包有无问题;

    使用MapReduce完成用户流量统计

    承上

    原文件如下

    直接新建flowCount包 

    1、新建class类,于map和reduce/输入/输出都为key-value键值对形式,所以必须将手机的流量信息封装成一个Bean类将这个类作为value

    import org.apache.hadoop.io.Writable;

    import java.io.DataInput;

    import java.io.DataOutput;

    import java.io.IOException;

    public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; //序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } //反序列化 //顺序不能乱 怎么读 怎么取 ps:序列化和反序列化字段的顺序要保持一致 @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } public FlowBean(){} public FlowBean(long upFlow, long downFlow, long sumFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } @Override public String toString() { return "Flowcount{" + "upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + '}'; } //自己创建一个set方法用于mapper 和reduce设置 public void set(long upFlow,long downFlow){ this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } }

    2、FlowMapper 继承Mapper 重写map方法

    public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] files = line.split("\\s+"); String phoneNumber = files[1]; //取出上行流量和下行流量 long upFlow = Long.parseLong(files[files.length - 3]); long downFlow = Long.parseLong(files[files.length - 2]); k.set(phoneNumber); v.set(upFlow,downFlow); context.write(k,v); } }

    3、FlowReducer 继承 Reducer 重写 reduce 方法

    public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> { FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { //reduce的输入大概是这样的 (“13560439658”,(FlowBean(198,4938),FlowBean(1116,954))) //创建两个初始值,用于累加操作 long sum_upFlow = 0; long sun_downFlow = 0; //执行累加操作 for (FlowBean flowBean : values) { sum_upFlow += flowBean.getUpFlow(); sun_downFlow += flowBean.getDownFlow(); } v.set(sum_upFlow,sun_downFlow); context .write(key,v); } }

    4、 驱动类,与上一个案例基本类似

    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //创建配置文件 Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "flowCount"); //设置jar的位置 job.setJarByClass(FlowDriver.class); //设置map和reduce的位置 job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //设置map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //设置key输出的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置输入输出的路径 FileInputFormat.setInputPaths(job,new Path("file:///D:\\Idea\\Mapreduce\\input")); //FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path("file:///D:\\Idea\\Mapreduce\\output\\out2")); //FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交程序运行 boolean result = job.waitForCompletion(true); System.exit(result? 0 : 1); } }

     5、运行成功

     

     

    Processed: 0.013, SQL: 9