Flume中的事件event源码分析和自定义拦截器interceptor

    技术2023-12-03  108

    Flume中的Event 在Flume中一行文本内容会被反序列化成一个event【序列化是将对象状态转换为可保持或传输的格式的过程。与序列化相对的是反序列化,它将流转换为对象。这两个过程结合起来,可以轻松地存储和传输数据】,event的最大定义为2048字节,超过,则会切割,剩下的会被放到下一个event中,默认编码是UTF-8,这都是统一的。

    Event定义的数据结构

    public interface Event { /** * Returns a map of name-value pairs describing the data stored in the body. */ public Map<String, String> getHeaders(); /** * Set the event headers * @param headers Map of headers to replace the current headers. */ public void setHeaders(Map<String, String> headers); /** * Returns the raw byte array of the data contained in this event. */ public byte[] getBody(); /** * Sets the raw byte array of the data contained in this event. * @param body The data. */ public void setBody(byte[] body); }

    由此可知,一个Event中的header是一个Map<String, String>,而body是一个字节数组byte[]。但是我们实际使用中真正传输的只有body中的数据,而header传输的数据是不会被sink出去的。

    那么,Event又是如何产出以及如何分流的呢?

    while ((line = reader.readLine()) != null) { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } } public static Event withBody(byte[] body, Map<String, String> headers) { Event event = new SimpleEvent(); if(body == null) { body = new byte[0]; } event.setBody(body); if (headers != null) { event.setHeaders(new HashMap<String, String>(headers)); } return event; }

    从源码中可以看出,这里是单纯的包装了event的body内容,line即是我们真正的数据内容,将其转换成UTF-8编码的字节内容分装到event的body中,它的header是null。用的是SimpleEvent类。header的话,就是在分装Event对象的时候,我们可以自定义的设置一些key-value对,这样做的目的,是为了后续的通道多路复用做准备的。

    在source端产出event的时候,通过header去区别对待不同的event,然后在sink端的时候,我们就可以通过header中的key来将不同的event输出到对应的sink下游去,这样就将event分流出去了,但是这里有一个前提:不建议通过对event的body解析来设置header,因为flume就是一个水槽,水槽是不会在中间对水进行加工的,要加工,等水流出去了再加工。

    a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host a1.sources.r1.interceptors.i1.hostHeader = hostname

    上面的host是你自定义的一个拦截器,hostHeader都是自定义的key,这样你就在event产出的时候,给各个event定义了不同的header,然后再通过多路复用通道的模式进行分流。

    a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4

    这样你就可以根据event的header中的key将其放入不同的channel中,紧接着再通过配置多个sink去不同的channel取出event,将其分流到不同的输出端。

    Flume中的拦截器 Flume中拦截器的作用就是在app(应用程序日志)和 source 之间对app日志进行拦截并做一些处理,即在日志进入到source之前,对日志进行一些包装、清新过滤等操作。

    官网提供的拦截器

    Timestamp Interceptor : 在event的header中添加一个key叫:timestamp,value为当前的时间戳。这个拦截器在sink为hdfs 时很有用; Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者; Static Interceptor: 可以在event的header中添加自定义的key和value。 Regex Filtering Interceptor: 通过正则来清洗或包含匹配的events。 Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

    像很多java的开源项目如springmvc中的拦截器一样,Flume的拦截器也是chain形式的,可以对一个source指定多个拦截器,按先后顺序依次处理。

    当然,也可以根据项目中的实际业务需求自定义拦截器,下面举个例子演示一下自定义拦截器如何使用。 自定义拦截器步骤 a)实现 org.apache.flume.interceptor.Interceptor 接口; b)重写四个方法

    initialize 初始化public Event intercept(Event event) 处理单个Eventpublic List intercept(List events) 处理多个Event,在这个方法中会调用Event intercept(Event event)方法close 方法 c)静态内部类,实现Interceptor.Builder

    案例演示:自定义一个日志类型区分拦截器LogTypeInterceptor 在Maven工程需要导入的依赖flume-ng-core:

    <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency>

    代码参考如下:

    import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; public class LogTypeInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { //1.获取body数据 byte[] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8")); //2.获取header Map<String, String> headers = event.getHeaders(); //3.判断数据类型并向header中赋值 if (log.contains("start")) { headers.put("topic","topic_start"); }else { headers.put("topic","topic_event"); } return event; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); interceptors.add(intercept1); } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogTypeInterceptor(); } @Override public void configure(Context context) { } } }

    代码编写完成之后打包只需要单独包,不需要将带依赖的包上传,打包之后要放入Flume的lib文件夹下面。

    Flume配置文件参考如下:

    a1.sources=r1 a1.channels=c1 c2 # configure source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /home/hadoop/app/flume-1.7.0-bin/test/log_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.xsluo.flume.interceptor.LogTypeInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = topic a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = weekend110:9092,weekend01:9092,weekend02:9092 a1.channels.c1.kafka.topic = topic_start a1.channels.c1.parseAsFlumeEvent = false a1.channels.c1.kafka.consumer.group.id = flume-consumer a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = weekend110:9092,weekend01:9092,weekend02:9092 a1.channels.c2.kafka.topic = topic_event a1.channels.c2.parseAsFlumeEvent = false a1.channels.c2.kafka.consumer.group.id = flume-consumer
    Processed: 0.019, SQL: 9