序列化接口
package org.apache.kafka.common.serialization; public interface Serializer<T> extends Closeable { //kafkaProducer实例时调用,配置当前类,指定编码 void configure(Map<String, ?> configs, boolean isKey); //序列化,转为字节数组 byte[] serialize(String topic, T data); //一般是一个空方法 @Override void close(); }maven依赖
<!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <!--jackson --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.1</version> </dependency> <!--protobuf--> <dependency> <groupId>com.baidu</groupId> <artifactId>jprotobuf-precompile-plugin</artifactId> <version>1.2.8</version> </dependency>实体类
import com.baidu.bjf.remoting.protobuf.FieldType; import com.baidu.bjf.remoting.protobuf.annotation.Protobuf; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * @author bigTree */ @Data @AllArgsConstructor @NoArgsConstructor @Builder public class People { //使用了百度封装的protobuf @Protobuf(fieldType = FieldType.STRING, order = 2) private String name; @Protobuf(fieldType = FieldType.INT32, order = 1, required = true) private int age; }自定义序列化类
import com.alibaba.fastjson.JSON; import com.baidu.bjf.remoting.protobuf.Codec; import com.baidu.bjf.remoting.protobuf.ProtobufProxy; import com.test.demo.pojo.People; import lombok.SneakyThrows; import org.apache.kafka.common.serialization.Serializer; import org.codehaus.jackson.map.ObjectMapper; import java.util.Map; /** * @author bigTree */ public class DefineSerializer implements Serializer<People> { String encode; ObjectMapper mapper; Codec<People> simpleTypeCodec; @Override public void configure(Map<String, ?> configs, boolean isKey) { encode = "UTF-8"; mapper = new ObjectMapper(); simpleTypeCodec = ProtobufProxy.create(People.class); } @SneakyThrows @Override public byte[] serialize(String topic, People people) { if(people == null){ return null; } byte[] SerializerPeople; //fastjson序列化 // SerializerPeople = JSON.toJSONBytes(people); //jackson序列化 // SerializerPeople = mapper.writeValueAsBytes(people); //protobuf序列化:必须提供无参构造器 SerializerPeople = simpleTypeCodec.encode(people); return SerializerPeople; } @Override public void close() {} }指定序列化器
//如果使用了自定义序列化器,那么消费者也必须指定对应的反序列化器 properties.put("value.serializer", DefineSerializer.class.getName());