本次搭建日志收集系统是个小demo。用到的工具有 链接:https://pan.baidu.com/s/1m_If2crjUtMTqRKuKrG9gw 提取码:n9oi ,工具和代码工程都在都在这里。
日志收集的流程如下。这里只是实现了到kibana
这里用的日志为log4j2。
maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!-- 排除spring-boot-starter-logging --> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <!-- log4j2 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.4</version> </dependency>下面是log4j2.xml,这里最重要的是对日志的格式化处理。
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" > <Properties> <Property name="LOG_HOME">logs</Property> <property name="FILE_NAME">collector</property> <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property> </Properties> <Appenders> <Console name="CONSOLE" target="SYSTEM_OUT"> <PatternLayout pattern="${patternLayout}"/> </Console> <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" > <PatternLayout pattern="${patternLayout}" /> <Policies> <TimeBasedTriggeringPolicy interval="1"/> <SizeBasedTriggeringPolicy size="500MB"/> </Policies> <DefaultRolloverStrategy max="20"/> </RollingRandomAccessFile> <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" > <PatternLayout pattern="${patternLayout}" /> <Filters> <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/> </Filters> <Policies> <TimeBasedTriggeringPolicy interval="1"/> <SizeBasedTriggeringPolicy size="500MB"/> </Policies> <DefaultRolloverStrategy max="20"/> </RollingRandomAccessFile> </Appenders> <Loggers> <!-- 业务相关 异步logger --> <AsyncLogger name="com.an.*" level="info" includeLocation="true"> <AppenderRef ref="appAppender"/> </AsyncLogger> <AsyncLogger name="com.an.*" level="info" includeLocation="true"> <AppenderRef ref="errorAppender"/> </AsyncLogger> <Root level="info"> <Appender-Ref ref="CONSOLE"/> <Appender-Ref ref="appAppender"/> <AppenderRef ref="errorAppender"/> </Root> </Loggers> </Configuration>在文件中有一些自定义的属性,用MDC放入对应的属性
MDC工具类
package com.an.collector.util; import org.jboss.logging.MDC; import org.springframework.context.EnvironmentAware; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; @Component public class InputMDC implements EnvironmentAware { private static Environment environment; @Override public void setEnvironment(Environment environment) { InputMDC.environment = environment; } public static void putMDC() { MDC.put("hostName", NetUtil.getLocalHostName()); MDC.put("ip", NetUtil.getLocalIp()); MDC.put("applicationName", environment.getProperty("spring.application.name")); } }这里只是打印了几条日志进行测试
filebeat抓取工程生产的log,到kafka中。
4.检查yml文件是否正确
cd /usr/local/filebeat-6.4.3 ./filebeat -c filebeat.yml -configtest ## Config OK主要修改以下配置
#broker的全局唯一编号,不能重复 broker.id=0 #删除topic功能使能 delete.topic.enable=true #kafka运行日志存放的路径 log.dirs=/opt/module/kafka/logs #配置连接Zookeeper集群地址 zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181加入
#KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile这里我配置的三台kafka集群,所以其他两台也按这样的步骤部署并修改配置文件。注意:配置文件中的broker.id=1、broker.id=2
我的上一篇文章已经写过logstash和elasticsearch的安装。地址:https://blog.csdn.net/qq_29963323/article/details/106573303
这里主要在logstash中加一个配置文件,以这个配置文件启动logstash
进入logstash
cd /usr/local/logstash-6.4.3/创建script文件夹并创建yml文件
mkdir script vim logstash.yml input{ kafka{ # app-log-服务名称 topics_pattern => "app-log-.*" bootstrap_servers => "192.168.1.101:9092" codec => json consumer_threads => 4 # 增加consumer的并行消费线程数 decorate_events => true # auto_offset_rest => "latest" group_id => "app-log-group" } kafka{ # app-log-服务名称 topics_pattern => "error-log-.*" bootstrap_servers => "192.168.1.101:9092" codec => json consumer_threads => 1 # 增加consumer的并行消费线程数 decorate_events => true # auto_offset_rest => "latest" group_id => "error-log-group" } } filter{ ## 时区转换 ruby{ code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))" } if "app-log" in [fields][logtopic]{ grok{ ## 表达式 match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"] } } if "error-log" in [fields][logtopic]{ grok{ ## 表达式 match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"] } } } output{ stdout{ codec => rubydebug } } # elasticsearch output{ if "app-log" in [fields][logtopic]{ elasticsearch{ ##host=>"192.168.1.1" ##port=>"9200" ##配置ES地址 hosts=>["192.168.1.101:9200"] #用户名密码 user => "esuser" password => "123456" ##索引名字,必须小写 index=>"app-log-%{[fields][logbiz]}-%{index_time}" ##是否嗅探集群ip sniffing => true # 重写模板 template_overwrite=>true # 默认为true,false关闭ogstash自动管理模板功能,如果自定义模板,则设置为false manage_template=>false } } if "error-log" in [fields][logtopic]{ elasticsearch{ ##host=>"192.168.1.1" ##port=>"9200" ##配置ES地址 hosts=>["192.168.1.101:9200"] #用户名密码 user => "esuser" password => "123456" ##索引名字,必须小写 index=>"error-log-%{[fields][logbiz]}-%{index_time}" ##是否嗅探集群ip sniffing => true # 重写模板 template_overwrite=>true } } }启动logstash
/usr/local/logstash-6.4.3/bin/logstash -f /usr/local/logstash-6.4.3/script/logstash.yml主要修改以下配置
在进入kibana之前。查看log数据是否存入到ES中,并且查看是否ES中创建了对应的索引。
说明数据已经在es中。kibana只是把数据可视化并进行分析
点这里创建相对应的索引,这里选择currentDateTime
然后这里一次在创建error-log-*的索引。
创建之后,就可以在Discover中查看相对应额数据
注意,当没有出现数据时,一定要选对时间