logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration scan="true" scanPeriod="5 seconds" debug="true" > ...... <appender name="KAFKA" class="com.jdd.logback.kafka.KafkaAppender" > <layout> <pattern>%d{yy-MM-dd.HH:mm:ss.SSS} [%-16t] %-5p %-22c %X{ServiceId} - %m%n</pattern> </layout> <servers>11.222.33.444:8888,11.222.33.555:8888</servers> </appender> ...... </configuration>KafkaAppender
import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.Layout; import ch.qos.logback.core.UnsynchronizedAppenderBase; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * @author: jinyang * @date: 2020-05-19 19:39 */ public class KafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent> { protected Layout<ILoggingEvent> layout; protected static KafkaProducer<String, String> producer; protected static final String KAFKA_LOGGER_NAME_PREFIX = "org.apache.kafka."; @Override public void start() { if (isStarted()) { return; } if (this.layout == null) { addError("No layout set for the appender named [" + name + "]."); return; } if (this.servers == null || servers.length() == 0) { addError("servers could not be blank."); return; } boolean initSuccess = initKafkaProducer(); if (!initSuccess) { return; } super.start(); } @Override protected void append(ILoggingEvent event) { if (event.getLoggerName().startsWith(KAFKA_LOGGER_NAME_PREFIX)) { addInfo(event.getMessage()); return; } String eventStr = this.layout.doLayout(event); sendKafkaMsg(eventStr); } protected void sendKafkaMsg(String eventStr) { ProducerRecord<String, String> producerRecord = new ProducerRecord<>("logback_test_topic", eventStr); producer.send(producerRecord); } protected String servers; protected String keySerializer; protected String valueSerializer; protected boolean initKafkaProducer() { try { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); properties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer == null ? "org.apache.kafka.common.serialization.StringSerializer" : keySerializer); properties.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer == null ? "org.apache.kafka.common.serialization.StringSerializer" : valueSerializer); properties.put(ProducerConfig.ACKS_CONFIG, "0"); properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); producer = new KafkaProducer<>(properties); return true; } catch (Exception e) { addError("Failed to construct kafka producer", e); return false; } } public Layout<ILoggingEvent> getLayout() { return layout; } public void setLayout(Layout<ILoggingEvent> layout) { this.layout = layout; } public void setServers(String servers) { this.servers = servers; } public void setKeySerializer(String keySerializer) { this.keySerializer = keySerializer; } public void setValueSerializer(String valueSerializer) { this.valueSerializer = valueSerializer; } }