MapReduce案例二:流量统计(序列化操作)

    技术2025-06-23  11

    在开发中往往常用的基本序列化类型,不能满足所有需求,比如在hadoop框架内部传递一个bean对象,那么该对象需要实现序列化接口 具体实现对象序列化步骤如下7步: 1、必须实现Writable接口 2、反序列化时,需要反射调用无参构造,所有必须有无参构造 3重新序列化方法 4、重写反序列化方法 5、反序列化的顺序和序列化的顺序安全一致 6、要把结果显示在文件中,需重写toString()方法 7、如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程要求都key必须能排序

    案例: 将上述表格中电话和上行流量、下行流量输出,并统计总流量,结果如下 代码: Bean代码

    package cn.kgc.flow; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author czg * date * description */ public class FlowBean implements Writable { private long upFlow;//上行流量 private long downFlow;//下行流量 private long sumFlow;//总流量 // 序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } //反序列化 //注意:序列化和反序列化的顺序需要保持一致 @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow=dataInput.readLong(); this.downFlow=dataInput.readLong(); this.sumFlow=dataInput.readLong(); } public FlowBean(){} public FlowBean(long upFlow, long downFlow, long sumFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } //自己创建一个set方法,用于map和reduce设置 public void det(long upFlow,long downFlow){ this.upFlow=upFlow; this.downFlow=downFlow; this.sumFlow=upFlow+downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }

    Map代码

    package cn.kgc.flow; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author czg * date * description */ public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1\获取一行 String line=value.toString(); //2、切割 String[] fields=line.split("\t"); //3、封装对象 String phone=fields[1]; //接收上行流量和下行流量。并塞入bean中 long upFlow=Long.parseLong(fields[fields.length-3]); long downDlow=Long.parseLong(fields[fields.length-2]); k.set(phone); v.setUpFlow(upFlow); v.setDownFlow(downDlow); // v.set(upFlow,downDlow); //4、写出 context.write(k,v); } }

    Reduce代码

    package cn.kgc.flow; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author czg * date * description */ public class FlowReduce extends Reducer<Text,FlowBean,Text,FlowBean> { FlowBean v=new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow=0; long sum_down_Flow=0; long sum_sum=0; //1\累加求和 for (FlowBean flowBean : values) { sum_upFlow+=flowBean.getUpFlow(); sum_down_Flow+=flowBean.getDownFlow(); sum_sum+=flowBean.getDownFlow()+flowBean.getUpFlow(); } v.setDownFlow(sum_down_Flow); v.setUpFlow(sum_upFlow); v.setSumFlow(sum_sum); //2、写出 context.write(key,v); } }

    Driver代码

    package cn.kgc.flow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author czg * date * description */ public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1\创建配置文件 Configuration conf=new Configuration(); Job job=Job.getInstance(conf,"flowBean"); //2、设置jar的位置 job.setJarByClass(FlowDriver.class); //3、设置map和reduce的位置 job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReduce.class); //4、设置map输出的key,value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //5、设置reduce输出的key,value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //6、设置输入输出路径 FileInputFormat.setInputPaths(job,new Path("file:///C:\\xuexi\\zoo\\data")); FileOutputFormat.setOutputPath(job,new Path("file:///C:\\xuexi\\zoo\\data1")); //7、提交程序运行 boolean b = job.waitForCompletion(true); System.exit(b?0:1); } }
    Processed: 0.015, SQL: 9