filebeat、kafka、elk搭建日志收集系统

    技术2025-08-03  15

    本次搭建日志收集系统是个小demo。用到的工具有 链接:https://pan.baidu.com/s/1m_If2crjUtMTqRKuKrG9gw  提取码:n9oi ,工具和代码工程都在都在这里。

    日志收集的流程如下。这里只是实现了到kibana

    一.配置java项目配置文件

    这里用的日志为log4j2。

    maven依赖

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!-- 排除spring-boot-starter-logging --> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <!-- log4j2 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.4</version> </dependency>

    下面是log4j2.xml,这里最重要的是对日志的格式化处理。

    <?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" > <Properties> <Property name="LOG_HOME">logs</Property> <property name="FILE_NAME">collector</property> <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property> </Properties> <Appenders> <Console name="CONSOLE" target="SYSTEM_OUT"> <PatternLayout pattern="${patternLayout}"/> </Console> <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" > <PatternLayout pattern="${patternLayout}" /> <Policies> <TimeBasedTriggeringPolicy interval="1"/> <SizeBasedTriggeringPolicy size="500MB"/> </Policies> <DefaultRolloverStrategy max="20"/> </RollingRandomAccessFile> <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" > <PatternLayout pattern="${patternLayout}" /> <Filters> <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/> </Filters> <Policies> <TimeBasedTriggeringPolicy interval="1"/> <SizeBasedTriggeringPolicy size="500MB"/> </Policies> <DefaultRolloverStrategy max="20"/> </RollingRandomAccessFile> </Appenders> <Loggers> <!-- 业务相关 异步logger --> <AsyncLogger name="com.an.*" level="info" includeLocation="true"> <AppenderRef ref="appAppender"/> </AsyncLogger> <AsyncLogger name="com.an.*" level="info" includeLocation="true"> <AppenderRef ref="errorAppender"/> </AsyncLogger> <Root level="info"> <Appender-Ref ref="CONSOLE"/> <Appender-Ref ref="appAppender"/> <AppenderRef ref="errorAppender"/> </Root> </Loggers> </Configuration>

    在文件中有一些自定义的属性,用MDC放入对应的属性

    MDC工具类

    package com.an.collector.util; import org.jboss.logging.MDC; import org.springframework.context.EnvironmentAware; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; @Component public class InputMDC implements EnvironmentAware { private static Environment environment; @Override public void setEnvironment(Environment environment) { InputMDC.environment = environment; } public static void putMDC() { MDC.put("hostName", NetUtil.getLocalHostName()); MDC.put("ip", NetUtil.getLocalIp()); MDC.put("applicationName", environment.getProperty("spring.application.name")); } }

    这里只是打印了几条日志进行测试

    二.filebeat安装

    filebeat抓取工程生产的log,到kafka中。

    1.解压

    tar -zxvf filebeat-6.4.3-linux-x86_64.tar.gz -C /usr/local/

    2.修改名称

    mv filebeat-6.4.3-linux-x86_64/ filebeat-6.4.3

    3.配置filebeat.yml

    vim /usr/local/filebeat-6.4.3/filebeat.yml filebeat.prospectors: - inpput_type: log paths: - /usr/local/logs/app-collector.log document_type: "app-log" multiline: pattern: '^\[' #指定匹配的表达式(匹配以[开头的表达式) negate: true #是否匹配到 match: after #合并到上一行的末尾 max_lines: 2000 # 最大的行数 timeout: 2s #如果没有新的日志,就输出 fields: logbiz: collector logtopic: app-log-collector ## 按服务划分用作kafka topic evn: dev - inpput_type: log paths: - /usr/local/logs/error-collector.log document_type: "orror-log" multiline: pattern: '^\[' #指定匹配的表达式(匹配以[开头的表达式) negate: true #是否匹配到 match: after #合并到上一行的末尾 max_lines: 2000 # 最大的行数 timeout: 2s #如果没有新的日志,就输出 fields: logbiz: collector logtopic: error-log-collector ## 按服务划分用作kafka topic evn: dev output.kafka: # Array of hosts to connect to. ebable: true hosts: ["192.168.1.101:9092"] topic: '%{[fields.logtopic]}' partition.hash: reachable_only: true compression: gzip max_message_bytes: 1000000 required_acks: 1 logging.to_files: true

    4.检查yml文件是否正确

    cd /usr/local/filebeat-6.4.3 ./filebeat -c filebeat.yml -configtest ## Config OK

    5.启动filebeat

    /usr/local/filebeat-6.6.0/filebeat & ps -ef | grep filebeat

    三.安装kafka

    1.解压

    tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

    2.修改名称

    mv kafka_2.11-0.11.0.0/ kafka

    3.在kafka文件夹下创建log文件夹

    mkdir logs

    4.修改配置文件

    cd config/ vim server.properties

    主要修改以下配置

    #broker的全局唯一编号,不能重复 broker.id=0 #删除topic功能使能 delete.topic.enable=true #kafka运行日志存放的路径 log.dirs=/opt/module/kafka/logs #配置连接Zookeeper集群地址 zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181

    5.修改环境变量

    sudo vi /etc/profile

    加入

    #KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile

    这里我配置的三台kafka集群,所以其他两台也按这样的步骤部署并修改配置文件。注意:配置文件中的broker.id=1broker.id=2

    6.群起kafka脚本

    case $1 in "start"){ for i in hadoop101 hadoop102 hadoop103 do echo " --------启动 $i Kafka-------" # 用于KafkaManager监控 ssh $i "export JMX_PORT=9988 && /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties " done };; "stop"){ for i in hadoop101 hadoop102 hadoop103 do echo " --------停止 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop" done };; esac

    四.安装logstash和elasticsearch

    我的上一篇文章已经写过logstash和elasticsearch的安装。地址:https://blog.csdn.net/qq_29963323/article/details/106573303

    这里主要在logstash中加一个配置文件,以这个配置文件启动logstash

    进入logstash

    cd /usr/local/logstash-6.4.3/

    创建script文件夹并创建yml文件

    mkdir script vim logstash.yml input{ kafka{ # app-log-服务名称 topics_pattern => "app-log-.*" bootstrap_servers => "192.168.1.101:9092" codec => json consumer_threads => 4 # 增加consumer的并行消费线程数 decorate_events => true # auto_offset_rest => "latest" group_id => "app-log-group" } kafka{ # app-log-服务名称 topics_pattern => "error-log-.*" bootstrap_servers => "192.168.1.101:9092" codec => json consumer_threads => 1 # 增加consumer的并行消费线程数 decorate_events => true # auto_offset_rest => "latest" group_id => "error-log-group" } } filter{ ## 时区转换 ruby{ code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))" } if "app-log" in [fields][logtopic]{ grok{ ## 表达式 match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"] } } if "error-log" in [fields][logtopic]{ grok{ ## 表达式 match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"] } } } output{ stdout{ codec => rubydebug } } # elasticsearch output{ if "app-log" in [fields][logtopic]{ elasticsearch{ ##host=>"192.168.1.1" ##port=>"9200" ##配置ES地址 hosts=>["192.168.1.101:9200"] #用户名密码 user => "esuser" password => "123456" ##索引名字,必须小写 index=>"app-log-%{[fields][logbiz]}-%{index_time}" ##是否嗅探集群ip sniffing => true # 重写模板 template_overwrite=>true # 默认为true,false关闭ogstash自动管理模板功能,如果自定义模板,则设置为false manage_template=>false } } if "error-log" in [fields][logtopic]{ elasticsearch{ ##host=>"192.168.1.1" ##port=>"9200" ##配置ES地址 hosts=>["192.168.1.101:9200"] #用户名密码 user => "esuser" password => "123456" ##索引名字,必须小写 index=>"error-log-%{[fields][logbiz]}-%{index_time}" ##是否嗅探集群ip sniffing => true # 重写模板 template_overwrite=>true } } }

    启动logstash

    /usr/local/logstash-6.4.3/bin/logstash -f /usr/local/logstash-6.4.3/script/logstash.yml

    五.安装kibana

    1.解压

    tar -zxvf kibana-6.3.1-linux-x86_64.tar.gz -C /opt/module/

    2.修改配置

    cd kibana-6.3.1-linux-x86_64/config vi kibana.yml

    主要修改以下配置

    3.启动kibana

    ./kibana

    在进入kibana之前。查看log数据是否存入到ES中,并且查看是否ES中创建了对应的索引。

    说明数据已经在es中。kibana只是把数据可视化并进行分析

    点这里创建相对应的索引,这里选择currentDateTime

    然后这里一次在创建error-log-*的索引。

    创建之后,就可以在Discover中查看相对应额数据

     

    注意,当没有出现数据时,一定要选对时间

    Processed: 0.009, SQL: 9