Filebeat7.3基础运用

    技术2022-07-12  107

    导言

    项目主要是通过Filebeat读取日志文件,传输到Kafka上,Logstash获取Kafka的消息,过滤日志信息,传输到ElasticSearch上。实现数据的实时统计。楼主也是刚接触了ElasticSearch,有什么错误的,或者更好的操作,可以提供下,一起讨论。

    1. 下载Filebeat的基础文件(根据自己的系统选择对应的文件

    https://www.elastic.co/cn/downloads/beats/filebeat)

    2. filebeat的配置文件(filebeat.yml)

    3. 日志源是json形式

    1.D:\program\php\a.log {"ip":"168.168.1.1"} 2.D:\program\php\b.log {“ips”:"127.0.0.2"}

    4. 配置filebeat的配置文件如下

    filebeat.inputs: # 日志类型 - type: log fields: # 用于后面的识别判断 log_source: ip # 是否实时读取 enabled: true # 日志文件位置 paths: -D:\program\php\a.log # 日志文件中包含id数据进行收集 include_lines: ['id'] # 多长时间去检测新的文件生成的时间 ,默认为 10s scan_frequency: 1s - type: log fields: log_source: ips enabled: true paths: -D:\program\php\b.log # 日志文件中包含ids数据进行收集 include_lines: ['ids'] # 多长时间去检测新的文件生成的时间 ,默认为 10s scan_frequency: 1s processors: # 删除掉filebeat再传输过程中附加的多余字段"log", "ecs","agent","agent" - drop_fields: fields: ["log", "ecs","agent","agent"] # 输出源,这是是输出到kafka output.kafka: enabled: true hosts: ["127.0.0.1:9092"] # 并发负载均衡kafka输出的数量 worker: 2 topic: "ip" # 单条消息的大小,默认值为10M max_message_bytes: 1024 # 配置多个topic输出 topics: # 根据上列设置的log_source设置传输到kafka的topics - topic: "%{[fields.log_source]}" when.contains: # 日志文件中包含的内容传输 message: "ip" - topic: "%{[fields.log_source]}" when.contains: message: "ips" # 进程数量 max_procs: 2 queue.mem: # 消息队列大小,默认值为4096(filebeat最大的可能占用的内存是max_message_bytes * queue.mem.events ) events: 4096 # 发送的最小事件数,默认为 2048 flush.min_events: 512 # 最长等待时间,设为为 0,事件将立即使用 flush.timeout: 1s

    5. 运行命令 ./filebeat -e -c filebeat-test.yml (在filebeat的根目录下运行)

    6. Filebeat配置信息参数(后续补充)

    原文链接:https://blog.csdn.net/xfxf0520/article/details/101440065

    欢迎大家访问我的博客:随风飘雁的博客

    Processed: 0.010, SQL: 9