kafka java序列化时间日期按照统一格式以及使用gson来做序列化反序列化配置

    技术2022-07-11  80

    kafka java序列化时间日期按照统一格式以及使用gson来做序列化反序列化配置 直接上代码

    public class GsonSerializer<T> implements Serializer<T> { public static final String CONFIG_VALUE_CLASS = "value.deserializer.class"; public static final String CONFIG_KEY_CLASS = "key.deserializer.class"; private Class<T> cls; private Gson gson = new GsonBuilder().setLongSerializationPolicy(LongSerializationPolicy.DEFAULT).registerTypeAdapter(LocalDateTime.class, new DateTimeSerializer()).registerTypeAdapter(Date.class, new DateTimeSerializer()).create(); @Override public void configure(Map<String, ?> config, boolean isKey) { // this is called right after construction // use it for initialisation String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS; String clsName = String.valueOf(config.get(configKey)); try { cls = (Class<T>) Class.forName(clsName); } catch (ClassNotFoundException e) { System.err.printf("Failed to configure GsonDeserializer. " + "Did you forget to specify the '%s' property ?%n", configKey); } } @Override public byte[] serialize(String s, T t) { return gson.toJson(t).getBytes(Charsets.UTF_8); } @Override public void close() { // this is called right before destruction } }

    上面是序列化类

    @Slf4j public class GsonDeserializer<T> implements Deserializer<T> { public static final String CONFIG_VALUE_CLASS = "value.deserializer.class"; public static final String CONFIG_KEY_CLASS = "key.deserializer.class"; private Class<T> cls; private Gson gson = new GsonBuilder().setLongSerializationPolicy(LongSerializationPolicy.DEFAULT).registerTypeAdapter(LocalDateTime.class, new DateTimeDeserializer()).create(); @Override public void configure(Map<String, ?> config, boolean isKey) { String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS; String clsName = String.valueOf(config.get(configKey)); try { cls = (Class<T>) Class.forName(clsName); } catch (ClassNotFoundException e) { System.err.printf("Failed to configure GsonDeserializer. " + "Did you forget to specify the '%s' Stringproperty ?%n", configKey); } } @Override public T deserialize(String topic, byte[] bytes) { try { return (T) gson.fromJson(new InputStreamReader(new ByteArrayInputStream(bytes),"UTF-8"), cls); } catch (UnsupportedEncodingException e) { log.error("Failed to desserialize ", e); e.printStackTrace(); return null; } } @Override public void close() {} }

    上面是反序列化类

    public class DateTimeSerializer implements JsonSerializer { private DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:SS"); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:SS"); @Override public JsonElement serialize(Object o, Type type, JsonSerializationContext jsonSerializationContext) { if((o instanceof LocalDateTime || type == LocalDateTime.class) && o != null){ return new JsonPrimitive(((LocalDateTime) o).format(dateTimeFormatter).toString()); } if((o instanceof Date || type == LocalDateTime.class) && o != null){ return new JsonPrimitive(sdf.format((Date) o)); } return null; } }

    上面是时间序列化类

    public class DateTimeDeserializer implements JsonDeserializer { String pattern = "yyyy-MM-dd HH:mm:SS"; DateFormat df = new SimpleDateFormat(pattern); @Override public Date deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { try { String result = json.getAsJsonPrimitive().getAsString(); return df.parse(result); } catch (ParseException e) { e.printStackTrace(); } return null; } }

    上面是时间反序列化类

    将上面的序列化工具类加入的kafka配置里面

    @Configuration public class KafkaConfig { @Autowired private KafkaProperties kafkaProperties; private String MQ_TOPIC_SSO_USERCHANGED = KafkaParams.MQ_TOPIC_SSO_USERCHANGED; // Producer configuration @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GsonSerializer.class); return props; } @Bean public ProducerFactory<String, Object> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public NewTopic adviceTopic() { return new NewTopic(MQ_TOPIC_SSO_USERCHANGED, 3, (short) 1); } }
    Processed: 0.010, SQL: 9