logstash简介及架构(一篇精通直接上手)

    技术2025-05-23  51

    1、logstash介绍

    数据收集处理引擎ETL工具

    2、logstash架构简介

    Logstash Event是一个java object,它对外暴露了获取内部字段以及修改内部字段值的一些api。

    举例

    stdin:标准输入

    codec是line,这个codec的作用就是按照每一行切割数据,就是说把每一行都转换成logstash event

    stdout:标准输出

    codec是json,这个codec的作用就是把每一个logstash event转换成json的对象输出。

    line codec decode是按照换行符\n切割的。所以一行原始数据被分成了两个event。

    所以,在原始数据和event之间不是一对一的关系。

    event经过json codec encode后就把每一个logstash event输出成 json object。

    3、测试

    可以看到上图的数据有一个message为空的数据,是因为在bar的后面又换了一行。

    4、详细讲解logstash的架构

    下图是logstash6.x的架构

    input是可以有多个的,每个input都有自己的codec,箭头代表数据流向。

    数据会经过Queue,Queue会把流入的数据分发到不同的pipeline中。

    每一个pipeline有Batcher、filter、output。

    Batcher的作用是批量的从Queue中取数据。Batcher是可以配置的,比如一次取一百个数据。

    看上图可知我有三个pipeline。

    5、Life of an Event

    这里介绍logstash Event的生命历程。

    web.log为我们的配置文件

    随着时间的推移,Batcher会收集越来越多的数据,当达到了处理数据的条件之后(Batcher会有两种条件,数目或时间,数目达到了设定的阈值或者是时间到了),Batcher就会把数据发送到filter,在filter中对每一条logstash Event进行相关的处理。

    最后output就会把数据输出到你指定的输出。

    那么,输出之后,会把处理的ACK发送给Queue,代表着我刚才处理了哪些event。

    6、Queue的分类

    logstash有两个Queue,一个是In Memory在内存中的Queue,这个Queue是固定大小的,是没法通过配置文件来修改的。坏处就是下图所示。

    为了解决这个问题,推出了持久化Queue就是Persistent Queue In Disk,这个就是基于磁盘对处理数据进行一个记录。

    7、Persistent Queue In Disk

    Data从Input进来,

    Data到PQ中。PQ会把这个数据在磁盘中备份一份。PQ告诉Input说这条数据我已经收到了。前提是Input要支持这种机制,有了这种机制才能感知目前logstash的这种处理能力。

    接下来蓝色部分,

    数据就从PQ到filter outputoutput把事件处理之后就会发送ACK到PQPQ收到了这个ACK之后,就会把磁盘上的数据删除掉

    这样就得知,即使发生了宕机,我的数据还是在disk中有的,只需要重启logstash把disk中的数据重新消费一次就解决了。

    8、Memory Queue与PQ的性能

    可以看到性能的下降不是很严重,估计是在5%以内,如果没有特殊需求一般建议把logstash的PQ打开。

    9、打开PQ

    queue.max_bytes默认是1GB,开大一点后,Queue能存储的数据也就多了一点。

     还有一些:

    path.queue PQ存到磁盘的哪个位置。

    queue.page_capacity 控制消息队列每一个文件的大小。

    queue.checkpoint.writes 提升容灾能力,如果是1 表示每写一个数据都去做盘,顶多也就会丢失一条数据。

    10、logstash中线程的相关情况

    调优的时候主要就是调整 Pipeline Workder Thread数。

    pipeline.workers可以配置的比cpu核数高,这样也不会浪费资源,当线程处于闲置状态的时候也不会占用cpu的资源。

    -w 是指work output的数量。

    batch等待的时长达到delay的时长,即使数据没达到size,还是会把数据发送到file output里来处理,它的作用就是减少数据到logstash的一个延迟,默认是50ms,一般都不需要改。

    我们可以使用VisualVM去看java jvm的一些情况。堆内存分配的情况,线程的情况。

    因为我的机器是4核的所以启动了4条线程。

    下面我们设置 -w 1

    11、logstash的相关配置

    分两种:

    logstash设置文件:在conf文件夹中,在logstash中成为 setting files(包含logstash.yaml 和 jvm.options)pipeline配置文件:就是input,filter,output的配置文件

    logstash.yml配置会在logstash启动的时候就会使用的。

    node.name就是设置logstash实例的名字等。。

    一个path.data只能被一个logstash实例使用。后面我们将多实例的时候主要就是对path.data进行配置。其主要作用就是logstash启用了持久化队列PQ,那么持久化队列的内容就会存在path.data下。

    path.config可以设置具体文件的地址,也可以设置文件夹的地址,如果是文件夹的地址,logstash会把文件夹里所有以.conf结尾的文件按照字母的顺序拼接成一个文件。

    我们可以通过配置不同的--path.settings来在一台机器上启动多个logstash实例。

    -e 是把pipeline的内容直接写到命令行里面,我们一般用这个做一些快速的pipeline内容的校验。

    --debug 打开调试日志

    -t 检验一下logstash加载进来的pipeline内容是否有错误,有错误的话就报错。是不会实际去执行logstash处理数据的流程的,只是检验。

    从上图可以看到这个配置是没问题的。我们修改一下codec为不存在的就会报错,如下图

    下面是关于--debug的参数,可以看到多出了非常多的debug级别的日志输出

     

    bin/logstash --path.settings 这样的命令是推荐使用的,因为可以把所有的配置加到文件里面方便做版本化管理。

    12、logstash多实例运行方式

    拷贝一份config文件夹并命名为instance2,同时修改其logstash.yml配置信息

    path.data 没有写完整路径就会在logstash的home目录下去创建这个目录

    修改config文件夹下的logstash.yml文件的配置信息

    分别启动

    如果不加--path.setting,则logstash会读取默认的config目录的配置

    进行测试 

    发现加载了不同的pipeline,通过这种方式可以在一台机器上优雅的启动多个logstash实例。

    13、pipeline配置

    左边可以理解为logstash Event,因为本身就是java Object。

    sprintf就是一种格式化输出的方法。

    当我们的条件比较复杂的时候可以用分组操作符把条件括起来,再结合别的操作,相对于清晰一点。

    14、Input插件详解及glob讲解

    14.1、stdin 

    举例说明 

    plain:什么都不做的解码 (后面有讲codec插件的详解时有详细提到)

    std:我们自己定义的类型

    输出结果中的tags之所以有[0]是因为我们用的是rubydebug输出的。


    14.2、file 

     

    sincedb是解决从上次读取的一个方案。

    归档:比如文件被rename或者删除了。

    下面是归档操作发生时候的底层原理源码:

    start_postion:起始的位置,有两个参数可选,默认是end,表示运行logstash运行之后只会读取从logstash运行之后新进来的数据。如果想让logstash从头读取要设置为beginning。注意只有在第一次读取的时候这个beginning才生效,一旦读取过这个文件在sincedb中有记录了,logstash之后再去跑发现在sincedb中有记录了,就不会生效了也就是不会再从头读取了。这个的坏处就是我们做一些调试的时候比较麻烦只能把sincedb文件给删掉。我们可以把sincedb_path设置为/dev/null 这是个特殊的文件,所有写入到这个文件的内容都不会存储,那么这样再把start_postion设置为beginning这样每一次运行logstash的时候都是从头读取的。(后面有针对sincedb_path的配置示例)

    discover_interval:logstash会定时的去检查是否有匹配你的path目录下新的文件出现。

    ignore_older:如果超过设定的时长,logstash就不回去open这个文件,只会去监控它。好处就在于节省一些文件句柄,节省资源。

    close_older:存在的意义也是及时的去释放文件句柄,来提供给那些真正需要读取文件的操作。ignore_older和close_older主要解决文件句柄释放的问题,如果是只读取一两个文件一般不会遇到问题,只有在读取大量的文件时候才考虑这两个参数的调优。


    举例 

    14.3、kafka 

    15、Codec Plugin详解

    Codec Plugin就是在input和output中对数据做encode和decode操作。

    15.1、dots和rubydebug

    测试

    我们使用下面的三条命令去做测试

    可以看出来输出的数据不是json,是一个rubydebug的输出。@timestamp、host、@version是logstash自动加的一个内容。

    可以看出来输出了一个“点”。当我们不想看到详细的输出,只想看logstash是否正在运行,比如我们做压力测试的时候就可以用这个。

    上图可以看出,输入hello,world的时候报错了。因为输入的格式不对。

    15.2、multiline 

    例如上面的日志格式,我们希望把java输出的堆栈日志作为一条Event来处理就需要用到multiline。

    negate是对pattern取反,默认是false。

    举例

    观察可以发现前面有空格,有空的时候都是属于第一行的内容。pattern里面的正则表达式表示以空格开头。

    实际运行

    再举个例子:我们现在是做代码有关的,在代码中做换行一般都是用\做结尾。

    pattern里面的正则表示以斜杠结尾的这一行和他的下一行一起组成同一个logstash Event

    上图可以看到数据来了之后就直接输出了,是因为匹配的是\符号。最后一行没有\就认为是结束了。

    再看一个例子

    这个例子以时间戳开头的

    这里的pattern的正则表示:以[时间戳] 开头的行。

    把nagate设置为true,表示不以[时间戳]开头的行。

    what:属于上一个

    当我们第一次输入是没有输出的,当我们再次输入才有输出,原因是跟之前一样的,要识别你是上一个logstash Event什么时候结束。

    16、Filter Plugin

    16.1、data

    @timestamp 是logstash Event默认的一个时间字段,这个字段不能是没有。我们也可以把自己解析出来的日期值放到指定的其他字段里。其中的Z就是零时区的。

    实际执行效果

    data的一些参数

    target字段就是指明上面match匹配后的时间戳要赋给哪个字段。

    16.2、grok

    问题的抛出

    我们想把这条apache的这条非结构化的日志数据解析成json格式的。

    如果我们用正则表达式处理的话应该是类似于下图这样的(部分正则)是非常复杂的,是不能人来维护的,之所以复杂是因为有很多种情况。这时候grok就出现了。

    grok是什么 

    IPORHOST是grok的一个内置的pattern(中文名:模式),匹配了这个pattern呢就赋值给clientip,clientip就是解析后json中的一个字段。通过这种方式就简化了使用正则去解析数据的复杂度。可以通过上图的网址查看有哪些内置的pattern。

    语法

    示例

    我们用7474端口的http的方式,再结合logstash的自动重新加载配置文件的方式去做快速调试。因为logstash重启会非常慢,如果我们改一次配置文件就重启一次logstash大量的时间就花在启动上了。

    所以:自动重新加载配置文件就直接在命令后面加上 -r

    注意:stdin无法reload

    配置文件修改我们在另一个窗口来做,我们先把不需要的注释掉。

    我们使用Insomnia或者是postman这类工具发送请求

    headers就是http请求的头部,这里我们使用mutate的remove_field把他删除掉:按下图配置做修改

    我们把最初的grok的规则代码拿过来,注意要转义一下特殊字符。

    自定义匹配规则

    如果grok的pattern满足不了我们,我们可以自定义规则

    上图的正则含义:一个10到11位的包含字符和数字组成的字段叫service_name

    我们把发送的字符串改成不符合规则的就会出现下图:

    这样写的坏处就是如果我们的规则要重复使用的话就比较繁琐了,grok提供了一个解决方案,看下图

    pattern_definitions

    作用:自定义正则表达式并可重复利用

    示例

    match匹配多种样式

    overwrite

     我们通过上面的各种演示会发现一个问题:我们的message通过grok解析之后生成了具体的字段,但是message字段还是保留了。(message和service_name字段的内容相同了)两种解决方式:

    通过上面的remove_filed把message删除在这个log里面,我想把部分信息赋给message字段,就可以用overwrite这种方法来实现。 示例 

     

    我们现在希望message只保留“123”,这个时候我们就用overwrite

    tag_on_failure

    我们知道如果匹配失败,则会默认有一个tag叫_grokparsefailure,我们后面做处理的时候就可以使用一些例如if的做判断。

    grok调试建议

    因为grok是基于正则表达式的。我们这里提供两个正则网站。

    正则写好了还要去写grok,grok也有一些调试工具。 

    如果要自定义patterns,就勾选“Add custom patterns”。

     

    16.3、dissect

    性能对比 

    从上图可以看出dissect比grok性能提升三倍左右。

    应用场景

    语法

     例子

    演示

     语法

    举例

    追加 

    追加并指定顺序 

    示例 

     

    字段名是匹配的值

    key1和key2是自己定义的,因为赋值的时候要用到。

    示例

    自动处理空的匹配值

    类型转换

    dissect和grok一样的,分割之后也都是字符串。

    16.4、mutate

    convert 

    示例

    我们注释所有的filter,直接请求一个数字,发现message是字符串格式。

    我们把123字符串改成整型

    gsub

    gsub是做内容替换的,其实就是清理数据的一个功能。比如我们拿到的数据不干净,想把这些不干净的数据去掉或者替换成另外的字符,就可以用这个。

    上面的意思是:把path字段里面的/替换成_

    把urlparams中的 \ ? # - 替换成.

    示例

    split

    示例

    join

    示例

    merge

    把source_arr合并到dest_arr 

     示例

    rename

    示例

    update/replace

    示例

    我们把a这个字段改成不存在的e

    会发现这个e没有,说明update是失败的。我们把update改成replace试试。

    remove

    一般用remove都是用来删除message字段。 因为我们对message字段做了大量的处理。

    示例

    17、Output Plugin

     17.1、stdout

    17.2、file

    主要需求是在运维上,比如我们有10台机器,我们想把10台机器的日志输出到一台机器上,方便查看,我们可以用file。

    path指定要往哪个地方存。

    codec指定格式。默认是json。上面我们指定为原始的message。如果不设定就是原始的json格式,查阅起来不方便。

    17.3、elasticsearch

    elasticsearch是基于http协议的,这里我们写hosts会去做一个负载均衡的操作,不要写elasticsearch的master node,因为logstash是做Blob插入的对性能要求还是比较高的,如果把master node写上去会对master node带来比较大的压力。

    index就是输出的索引名称,例子中我们就是按照时间来格式化。

    template、template_name、template_overwrite主要是跟模板相关的内容。

    elasticsearch的配置还有非常多的内容,具体可以详细看一下elasticsearch的插件。

    17.4、azure cosmosDB

    点我跳转到另一篇博文

    18、讲解一下文档

    https://www.elastic.co/guide/en/logstash/current/index.html

    19、调试建议

     

     

     

     

     

     

     

     

     

     

     

    Processed: 0.011, SQL: 9