一 、简介
如果进行Hadoop代码开发,诸如mapreduce等;
有时需要在框架内部传递bean对象,这时就需要序列化;我们知道Hadoop是由Java编写的,如果用Java本身的序列化Serializable可不可行呢?
Hadoop框架本身实现序列化的接口Writable,有哪些优点呢?具体又是如何实现的呢?
接下来我将自己理解的一一介绍,和大家一起交流!并通过简单的mapreduce开发过程进行实操
二、Java–Serializable
先看下普通Java序列化
1、构造bean对象
package com
.lee
.writable
;
import java
.io
.Serializable
;
public class MyInfo implements Serializable {
private static final long serialVersionUID
= 15555555555789L
;
String name
;
int age
;
private String tel
;
public static long getSerialVersionUID() {
return serialVersionUID
;
}
public String
getName() {
return name
;
}
public void setName(String name
) {
this.name
= name
;
}
public int getAge() {
return age
;
}
public void setAge(int age
) {
this.age
= age
;
}
public String
getTel() {
return tel
;
}
public void setTel(String tel
) {
this.tel
= tel
;
}
@Override
public String
toString() {
return "MyInfo{" +
"name='" + name
+ '\'' +
", age=" + age
+
", tel='" + tel
+ '\'' +
'}';
}
}
2、序列化工具类
package com
.lee
.writable
;
import java
.io
.FileInputStream
;
import java
.io
.FileOutputStream
;
import java
.io
.ObjectInputStream
;
import java
.io
.ObjectOutputStream
;
public class TestJavaSerializableUtils {
public static void saveObject(Object object
) throws Exception
{
ObjectOutputStream out
= null
;
FileOutputStream fos
= null
;
try {
fos
= new FileOutputStream("hadoop/src/file/testJavaSer.txt");
out
= new ObjectOutputStream(fos
);
out
.writeObject(object
);
} finally {
fos
.close();
out
.close();
}
}
public static Object
readObject() throws Exception
{
ObjectInputStream in
= null
;
FileInputStream fis
= null
;
try {
fis
= new FileInputStream("hadoop/src/file/testJavaSer.txt");
in
= new ObjectInputStream(fis
);
Object object
= in
.readObject();
return object
;
} finally {
fis
.close();
in
.close();
}
}
}
3、主类测试
package com
.lee
.writable
;
public class TestJavaSerializable {
public static void main(String
[] args
) {
long begin
= System
.currentTimeMillis();
MyInfo info
= new MyInfo();
info
.setAge(18);
info
.setName("lee");
info
.setTel("155****0831");
try {
TestJavaSerializableUtils
.saveObject(info
);
} catch (Exception e
) {
System
.out
.println("保存时异常:" + e
.getMessage());
}
MyInfo myInfo
;
try {
myInfo
= (MyInfo
) TestJavaSerializableUtils
.readObject();
System
.out
.println(myInfo
);
} catch (Exception e
) {
System
.out
.println("读取时异常:" + e
.getMessage());
}
long end
= System
.currentTimeMillis();
System
.out
.println("耗时:" + (end
- begin
));
}
}
jdk序列化的缺点
1、序列化之后的码流过大
jdk进行序列化编码之后产生的字节数组过大(额外附加校验、header、继承体系),占用的存储内存空间也较高,这就导致了相应的流在网络
传输的时候带宽占用较高,性能相比较为低下的情况。
2、另外,无法跨语言
这一缺点几乎是致命伤害,对于跨进程的服务调用,通常都需要考虑到不同语言的相互调用时候的兼容性,而这一点对于jdk序列化操作来说却
无法做到。这是因为jdk序列化操作时是使用了java语言内部的私有协议,在对其他语言进行反序列化的时候会有严重的阻碍。
三、Hadoop–Writable
由于Java序列化属于重量级且不利于网络传输,而大数据架构的瓶颈主要也集中于磁盘性能和网络IO;另外,自己框架内部的实现,自然更有利
于解决由于Hadoop版本升级引发的问题;Hadoop框架有一套自己的序列化实现--Writable
特点:
1、紧凑,体积小,节省带宽;
2、快速,序列化过程快速;
3、可扩展性(向下兼容),新API支持旧数据格式;
Wriable实现过程
1 实验数据及处理要求
1,13568436656,Android
,2481,81,200
2,13568436656,Android
,1116,16,200
3,kylin
,PC
,1000,152,200
4,kylin
,Android
,500,0,404
5,sqoop
,IOS
,5000,206,200
6,jd_lzs
,Android
,8000,7999,200
7,yarn
,IOS
,1116,16,200
8,hello_world
,IOS
,3156,56,200
9,shihaiyang
,PC
,240,0,200
10,wangerxin
,PC
,6960,200,200
11,xurui
,Android
,3659,300,200
12,luomeng
,IOS
,1938,38,500
13,luomeng
,Android
,918,7,200
14,hive
,Android
,180,180,200
15,dahaige
,Android
,1938,38,200
16,hbase
,Android
,3008,8,404
17,hello_world
,Android
,7335,35,404
18,hujie
,Android
,9531,1,200
19,hujie
,Android
,11058,1000,200
20,jd_lzs
,Android
,120,0,200
21,hdfs
,PC
,1000,18,200
22,hive
,PC
,1500,0,200
2 编写mapreduce代码
0、dependencies
<?xml version
="1.0" encoding
="UTF-8"?>
<project xmlns
="http://maven.apache.org/POM/4.0.0"
xmlns
:xsi
="http://www.w3.org/2001/XMLSchema-instance"
xsi
:schemaLocation
="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>learning
</artifactId
>
<groupId>com
.lee
</groupId
>
<version>1.0.0-SNAPSHOT
</version
>
</parent
>
<modelVersion>4.0.0</modelVersion
>
<artifactId>hadoop
</artifactId
>
<dependencies>
<dependency>
<groupId>junit
</groupId
>
<artifactId>junit
</artifactId
>
<version>RELEASE
</version
>
</dependency
>
<dependency>
<groupId>org
.apache
.logging
.log4j
</groupId
>
<artifactId>log4j
-core
</artifactId
>
<version>2.8.2</version
>
</dependency
>
<dependency>
<groupId>org
.apache
.hadoop
</groupId
>
<artifactId>hadoop
-common
</artifactId
>
<version>2.7.2</version
>
</dependency
>
<dependency>
<groupId>org
.apache
.hadoop
</groupId
>
<artifactId>hadoop
-client
</artifactId
>
<version>2.7.2</version
>
</dependency
>
<dependency>
<groupId>org
.apache
.hadoop
</groupId
>
<artifactId>hadoop
-hdfs
</artifactId
>
<version>2.7.2</version
>
</dependency
>
<dependency>
<groupId>jdk
.tools
</groupId
>
<artifactId>jdk
.tools
</artifactId
>
<version>1.8</version
>
<scope>system
</scope
>
<systemPath>d
:/jdk1
.8/lib
/tools
.jar
</systemPath
>
</dependency
>
</dependencies
>
<build>
<plugins>
<plugin>
<artifactId>maven
-compiler
-plugin
</artifactId
>
<version>2.3.2</version
>
<configuration>
<source>1.8</source
>
<target>1.8</target
>
</configuration
>
</plugin
>
<plugin>
<artifactId>maven
-assembly
-plugin
</artifactId
>
<configuration>
<descriptorRefs>
<descriptorRef>jar
-with
-dependencies
</descriptorRef
>
</descriptorRefs
>
</configuration
>
<executions>
<execution>
<id>make
-assembly
</id
>
<phase>package</phase
>
<goals>
<goal>single
</goal
>
</goals
>
</execution
>
</executions
>
</plugin
>
</plugins
>
</build
>
</project
>
1、统计消费情况的bean对象
package com
.lee
.writable
;
import org
.apache
.hadoop
.io
.Writable
;
import java
.io
.DataInput
;
import java
.io
.DataOutput
;
import java
.io
.IOException
;
public class ShoppingBean implements Writable {
private int origin_price
;
private int discount
;
private int pay_money
;
public ShoppingBean() {
super();
}
public ShoppingBean(int para_origin
, int para_discount
) {
super();
this.origin_price
= para_origin
;
this.discount
= para_discount
;
this.pay_money
= para_origin
- para_discount
;
}
@Override
public void write(DataOutput dataOutput
) throws IOException
{
dataOutput
.writeInt(origin_price
);
dataOutput
.writeInt(discount
);
dataOutput
.writeInt(pay_money
);
}
@Override
public void readFields(DataInput dataInput
) throws IOException
{
this.origin_price
= dataInput
.readInt();
this.discount
= dataInput
.readInt();
this.pay_money
= dataInput
.readInt();
}
public int getOrigin_price() {
return origin_price
;
}
public void setOrigin_price(int origin_price
) {
this.origin_price
= origin_price
;
}
public int getDiscount() {
return discount
;
}
public void setDiscount(int discount
) {
this.discount
= discount
;
}
public int getPay_money() {
return pay_money
;
}
public void setPay_money(int pay_money
) {
this.pay_money
= pay_money
;
}
@Override
public String
toString() {
return "ShoppingBean{" +
"origin_price=" + origin_price
+
", discount=" + discount
+
", pay_money=" + pay_money
+
'}';
}
}
2、编写Mapper类
package com
.lee
.writable
;
import org
.apache
.hadoop
.io
.LongWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Mapper
;
import java
.io
.IOException
;
public class ShoppingMapper extends Mapper<LongWritable, Text, Text, ShoppingBean> {
Text k
= new Text();
ShoppingBean shoppingBean
= new ShoppingBean();
@Override
protected void map(LongWritable key
, Text values
, Context context
) throws IOException
, InterruptedException
{
String line
= values
.toString();
String
[] iterms
= line
.split(",");
if (iterms
[iterms
.length
- 1].equals("404")) {
return;
}
String username
= iterms
[1];
k
.set(username
);
int origin_price
= Integer
.parseInt(iterms
[iterms
.length
- 3]);
int discount
= Integer
.parseInt(iterms
[iterms
.length
- 2]);
shoppingBean
.setOrigin_price(origin_price
);
shoppingBean
.setDiscount(discount
);
context
.write(k
, shoppingBean
);
}
}
3、编写Reducer类
package com
.lee
.writable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Reducer
;
import java
.io
.IOException
;
public class ShoppingReducer extends Reducer<Text, ShoppingBean, Text, ShoppingBean> {
@Override
protected void reduce(Text key
, Iterable
<ShoppingBean> values
, Context context
) throws IOException
, InterruptedException
{
int sumOrigin
= 0;
int sumDiscount
= 0;
for (ShoppingBean value
: values
) {
sumOrigin
+= value
.getOrigin_price();
sumDiscount
+= value
.getDiscount();
}
ShoppingBean bean
= new ShoppingBean(sumOrigin
, sumDiscount
);
context
.write(key
, bean
);
}
}
4、编写main类(驱动类)
package com
.lee
.writable
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
import java
.io
.IOException
;
public class ShoppingDriver {
public static void main(String
[] args
) throws IOException
, ClassNotFoundException
, InterruptedException
{
Configuration conf
= new Configuration();
Job job
= Job
.getInstance(conf
);
job
.setJarByClass(ShoppingDriver
.class);
job
.setMapperClass(ShoppingMapper
.class);
job
.setReducerClass(ShoppingReducer
.class);
job
.setMapOutputKeyClass(Text
.class);
job
.setMapOutputValueClass(ShoppingBean
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(ShoppingBean
.class);
FileInputFormat
.setInputPaths(job
, new Path("hadoop/src/file/writable/data.txt"));
FileOutputFormat
.setOutputPath(job
, new Path("hadoop/src/file/writable_out"));
boolean result
= job
.waitForCompletion(true);
System
.exit(result
? 0 : 1);
}
}
5、执行结果,达到预期