浅谈Hadoop序列化接口Writable及实操

    技术2025-04-22  11

    一 、简介

    如果进行Hadoop代码开发,诸如mapreduce等; 有时需要在框架内部传递bean对象,这时就需要序列化;我们知道Hadoop是由Java编写的,如果用Java本身的序列化Serializable可不可行呢? Hadoop框架本身实现序列化的接口Writable,有哪些优点呢?具体又是如何实现的呢? 接下来我将自己理解的一一介绍,和大家一起交流!并通过简单的mapreduce开发过程进行实操

    二、Java–Serializable

    先看下普通Java序列化

    1、构造bean对象 package com.lee.writable; import java.io.Serializable; public class MyInfo implements Serializable { /**显示声名,系统会把当前类声明的serialVersionUID写入到序列化文件中,用于反序列化时 *系统会去检测文件中的serialVersionUID前后是否一致, *隐示不声明,Java会根据class文件自动生成,但是在这个过程中类信息更改,UID会匹配失败 */ private static final long serialVersionUID = 15555555555789L; String name; int age; //tel 不想被序列化可以如下(transient);反序列化后tel为null值 //private transient String tel; private String tel; public static long getSerialVersionUID() { return serialVersionUID; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getTel() { return tel; } public void setTel(String tel) { this.tel = tel; } @Override public String toString() { return "MyInfo{" + "name='" + name + '\'' + ", age=" + age + ", tel='" + tel + '\'' + '}'; } } 2、序列化工具类 package com.lee.writable; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class TestJavaSerializableUtils { // 保存对象,序列化 public static void saveObject(Object object) throws Exception { ObjectOutputStream out = null; FileOutputStream fos = null; try { fos = new FileOutputStream("hadoop/src/file/testJavaSer.txt"); out = new ObjectOutputStream(fos); out.writeObject(object); } finally { fos.close(); out.close(); } } // 读取对象,反序列化 public static Object readObject() throws Exception { ObjectInputStream in = null; FileInputStream fis = null; try { fis = new FileInputStream("hadoop/src/file/testJavaSer.txt"); in = new ObjectInputStream(fis); Object object = in.readObject(); return object; } finally { fis.close(); in.close(); } } } 3、主类测试 package com.lee.writable; public class TestJavaSerializable { public static void main(String[] args) { long begin = System.currentTimeMillis(); //封装数据 MyInfo info = new MyInfo(); info.setAge(18); info.setName("lee"); info.setTel("155****0831"); // 序列化 try { TestJavaSerializableUtils.saveObject(info); } catch (Exception e) { System.out.println("保存时异常:" + e.getMessage()); } // 反序列化 MyInfo myInfo; try { myInfo = (MyInfo) TestJavaSerializableUtils.readObject(); System.out.println(myInfo); } catch (Exception e) { System.out.println("读取时异常:" + e.getMessage()); } long end = System.currentTimeMillis(); System.out.println("耗时:" + (end - begin)); } }

    jdk序列化的缺点

    1、序列化之后的码流过大 jdk进行序列化编码之后产生的字节数组过大(额外附加校验、header、继承体系),占用的存储内存空间也较高,这就导致了相应的流在网络 传输的时候带宽占用较高,性能相比较为低下的情况。 2、另外,无法跨语言 这一缺点几乎是致命伤害,对于跨进程的服务调用,通常都需要考虑到不同语言的相互调用时候的兼容性,而这一点对于jdk序列化操作来说却 无法做到。这是因为jdk序列化操作时是使用了java语言内部的私有协议,在对其他语言进行反序列化的时候会有严重的阻碍。

    三、Hadoop–Writable

    由于Java序列化属于重量级且不利于网络传输,而大数据架构的瓶颈主要也集中于磁盘性能和网络IO;另外,自己框架内部的实现,自然更有利 于解决由于Hadoop版本升级引发的问题;Hadoop框架有一套自己的序列化实现--Writable 特点: 1、紧凑,体积小,节省带宽; 2、快速,序列化过程快速; 3、可扩展性(向下兼容),新API支持旧数据格式;

    Wriable实现过程

    1 实验数据及处理要求

    # 数据格式 # id,注册名,平台,原价,优惠,网络状态码 #期望输出 # 注册名 原价 优惠 最终价格 # hdfs 1000 18 982 1,13568436656,Android,2481,81,200 2,13568436656,Android,1116,16,200 3,kylin,PC,1000,152,200 4,kylin,Android,500,0,404 5,sqoop,IOS,5000,206,200 6,jd_lzs,Android,8000,7999,200 7,yarn,IOS,1116,16,200 8,hello_world,IOS,3156,56,200 9,shihaiyang,PC,240,0,200 10,wangerxin,PC,6960,200,200 11,xurui,Android,3659,300,200 12,luomeng,IOS,1938,38,500 13,luomeng,Android,918,7,200 14,hive,Android,180,180,200 15,dahaige,Android,1938,38,200 16,hbase,Android,3008,8,404 17,hello_world,Android,7335,35,404 18,hujie,Android,9531,1,200 19,hujie,Android,11058,1000,200 20,jd_lzs,Android,120,0,200 21,hdfs,PC,1000,18,200 22,hive,PC,1500,0,200

    2 编写mapreduce代码

    0、dependencies <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>learning</artifactId> <groupId>com.lee</groupId> <version>1.0.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>hadoop</artifactId> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>d:/jdk1.8/lib/tools.jar</systemPath> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin </artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> 1、统计消费情况的bean对象 package com.lee.writable; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * (1)必须实现Writable接口 * (2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造 * (3)重写序列化方法 * * @Override public void write(DataOutput out) throws IOException { * } * (4)重写反序列化方法 * @Override public void readFields(DataInput in) throws IOException { * } * (5)注意反序列化的顺序和序列化的顺序完全一致 * (6)要想把结果显示在文件中,需要重写toString(),方便后续使用。 */ /** * 支付统计的bean * */ public class ShoppingBean implements Writable { //测试文件,全都是int private int origin_price; private int discount; private int pay_money; //反序列化时反射需要调用空参 public ShoppingBean() { super(); } //有参构造,用于reducer输出 封装数据用 public ShoppingBean(int para_origin, int para_discount) { super(); this.origin_price = para_origin; this.discount = para_discount; this.pay_money = para_origin - para_discount; } //序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(origin_price); dataOutput.writeInt(discount); dataOutput.writeInt(pay_money); } //反序列化 注意和序列化顺序一致 @Override public void readFields(DataInput dataInput) throws IOException { this.origin_price = dataInput.readInt(); this.discount = dataInput.readInt(); this.pay_money = dataInput.readInt(); } //getter and setter // toString() public int getOrigin_price() { return origin_price; } public void setOrigin_price(int origin_price) { this.origin_price = origin_price; } public int getDiscount() { return discount; } public void setDiscount(int discount) { this.discount = discount; } public int getPay_money() { return pay_money; } public void setPay_money(int pay_money) { this.pay_money = pay_money; } @Override public String toString() { return "ShoppingBean{" + "origin_price=" + origin_price + ", discount=" + discount + ", pay_money=" + pay_money + '}'; } } 2、编写Mapper类 package com.lee.writable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ShoppingMapper extends Mapper<LongWritable, Text, Text, ShoppingBean> { //map输出,用于封装数据 Text k = new Text(); ShoppingBean shoppingBean = new ShoppingBean(); //(行偏移量,行数据,map输出) @Override protected void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { //读取行转为String String line = values.toString(); //切割 String[] iterms = line.split(","); //网络状态过滤 if (iterms[iterms.length - 1].equals("404")) { return; } //封装输出的数据 String username = iterms[1]; k.set(username); int origin_price = Integer.parseInt(iterms[iterms.length - 3]); int discount = Integer.parseInt(iterms[iterms.length - 2]); shoppingBean.setOrigin_price(origin_price); shoppingBean.setDiscount(discount); context.write(k, shoppingBean); } } 3、编写Reducer类 package com.lee.writable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class ShoppingReducer extends Reducer<Text, ShoppingBean, Text, ShoppingBean> { @Override protected void reduce(Text key, Iterable<ShoppingBean> values, Context context) throws IOException, InterruptedException { int sumOrigin = 0; int sumDiscount = 0; // reduce已经按用户名归类了,原价和优惠 分别累加一起 for (ShoppingBean value : values) { sumOrigin += value.getOrigin_price(); sumDiscount += value.getDiscount(); } //封装数据 ShoppingBean bean = new ShoppingBean(sumOrigin, sumDiscount); //输出 context.write(key, bean); } } 4、编写main类(驱动类) package com.lee.writable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class ShoppingDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //用于封装 mapreduce各种运行参数 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //jar包 job.setJarByClass(ShoppingDriver.class); //map reduce 类 job.setMapperClass(ShoppingMapper.class); job.setReducerClass(ShoppingReducer.class); //map输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ShoppingBean.class); //最终输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(ShoppingBean.class); //输入和输出路径 //import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; //import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // 注意不是import org.apache.hadoop.mapred包下的FileInputFormat/FileOutputFormat FileInputFormat.setInputPaths(job, new Path("hadoop/src/file/writable/data.txt")); FileOutputFormat.setOutputPath(job, new Path("hadoop/src/file/writable_out")); // 将job中配置的相关参数 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } 5、执行结果,达到预期

    Processed: 0.010, SQL: 9