1.Elasticsearch 2.Kibana 3.Logstash 4.Kafka:消息中间件的一种 (消息中间件参考我的另一篇博文https://blog.csdn.net/m0_45899013/article/details/106994409:) 高吞吐:处理消息的能力强,比RabbitMQ和ActiveMQ处理消息的能力强。
即:logstash从kafka拿到日志发给es,kibana来展示。
登录portainer容器检查:
1.kafka 配置文件 server.properties 修改 打开本地文件dockerfiles-master文件,src,kafka,查看配置文件server.properties,定位需要改动的地方。查看Dockerfile,定位配置文件所在位置,到虚拟机上进行修改
vi /usr/local/kafka_2.10-0.10.2.1/config/server.properties2. elasticsearch.yml 不用修改 3. kibana.yml 不用修改 4. logstash.conf 和 logstash.yml 修改
vi /usr/local/logstash-6.3.0/bin/logstash.conf vi /usr/local/logstash-6.3.0/config/logstash.yml5.修改完毕,重启4个容器。restart。 6.访问kibana
参考语法: 1.父级maven项目shop-common编写KafkaConfig、KafkaUtil类。
utils模块pom引入依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> @Component @ConfigurationProperties(prefix = "kafkaConfig") public class KafkaConfig { private String moduleName; public static final class LogType{ public static String INFO="info"; public static String ERROR="error"; } public static String qgKey="qg"; public void setModuleName(String moduleName){ this.moduleName = moduleName; } public String getModuleName() { return moduleName; } } @Component public class KafkaUtil { @Autowired private KafkaTemplate kafkaTemplate; @Autowired private KafkaConfig kafkaConfig; //发送日志的正常方法 public String sendInfoMessage(String message) { try { kafkaTemplate.send(kafkaConfig.getModuleName() + "-" + KafkaConfig.LogType.INFO, KafkaConfig.qgKey, message); } catch (Exception e) { e.printStackTrace(); return "fail"; } return "success"; } //发送日志的异常方法 public String sendErrorMessage(Exception em) { try { Writer writer = new StringWriter(); PrintWriter pw = new PrintWriter(writer); em.printStackTrace(pw); kafkaTemplate.send(kafkaConfig.getModuleName() + "-" + KafkaConfig.LogType.ERROR, KafkaConfig.qgKey, writer.toString()); } catch (Exception e) { e.printStackTrace(); return "fail"; } return "success"; } }编写完毕,刷新maven,install。 2.qg-user-provider配置文件添加配置
kafkaConfig.moduleName=user3.qg-user-consumer配置文件添加配置
kafkaConfig.moduleName=user #kafka配置文件 spring.kafka.producer.bootstrap-servers=10.211.55.3:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer4.qg-user-consumer编写测试类TestUserController
//用于测试kafka日志 @RestController public class TestUserController { @Autowired private KafkaUtil kafkaUtil; @RequestMapping(value = "/test") public String test(){ kafkaUtil.sendInfoMessage("this is a test function"); System.out.println("================================="); return "发送成功"; } }1.添加进一条数据,执行。 2.添加索引,保存。 3.启动qg-user-provider,启动qg-user-consumer,访问http://localhost:8082/test,出现“测试成功”,成功。刷新kibana: 4.添加新索引后索引,选择@timestamp模式,create index pattern进行应用。到discover查看: 添加完毕怎么请求呢?持续更新。。。
停掉服务,重启zookeeper。