springkafka部署到使用

    技术2025-07-21  9

    http://kafka.apache.org/quickstart

    首先在你的服务器上安装kafka

    https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.12-2.5.0.tgz 注意版本别安装错

    > tar -xzf kafka_2.12-2.5.0.tgz > cd kafka_2.12-2.5.0

    启动kafka内置的zookeeper

    > nohup bin/zookeeper-server-start.sh config/zookeeper.properties >zookeeperoutput 2>&1 & [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...

    启动kafka

    > nohup bin/kafka-server-start.sh config/server.properties >kafkaoutput 2>&1 & [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...

    nohup bin/kafka-server-start.sh -daemon ./config/server.properties 可以永久启动 如果报了内存问题,记得修改启动内存

    修改bin目录下的kafka-server-start.sh文件 修改初始堆大小:-Xms 设置小一些

    如果从本地连到服务器上的,还需要修改config/server.properties文件配置 最后修改项目配置

    server填自己的服务器地址 代码中调用

    package com.ftui.userService; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @SpringBootApplication @EnableEurekaClient public class UserServiceApp implements CommandLineRunner { public static Logger logger = LoggerFactory.getLogger(UserServiceApp.class); public static void main(String[] args) { SpringApplication.run(UserServiceApp.class,args); } @Autowired KafkaTemplate kafkaTemplate; private final CountDownLatch latch = new CountDownLatch(3); @Override public void run(String... args) throws Exception { kafkaTemplate.send("myTopic", "foo1"); kafkaTemplate.send("myTopic", "foo2"); kafkaTemplate.send("myTopic", "foo3"); latch.await(60, TimeUnit.SECONDS); logger.info("All received"); } @KafkaListener(topics = "myTopic") public void listen(ConsumerRecord<?, ?> cr) throws Exception { logger.info(cr.toString()); latch.countDown(); } }

    修改kafka连接zookeer时间 #zk连接所用时间 zookeeper.connection.timeout.ms

    修改kafka的启动端口号 修改zookeeper配置

    vim config/zookeeper.properties clientPort=2182

    修改kafka配置

    vim config/server.properties # 对应zk地址 zookeeper.connect=localhost:2182 listeners=PLAINTEXT://:9093 advertised.listeners=PLAINTEXT://127.0.0.1:9093

    vim config/connect-standalone.properties

    # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9093

    vim config/consumer.properties

    zookeeper.connect=127.0.0.1:2182

    vim config/producer.properties

    bootstrap.servers=localhost:9093
    Processed: 0.009, SQL: 9