SpringCloud二次开发-消息队列(第一篇)

    技术2023-11-21  115

    序 我们知道不同中间件之间存在很多差异,如RocketMQ支持定时重试,每次重试间隔逐渐增加。 Kafka,RabbitMq不支持重试。我们要做一个统一的Spring Cloud Starter 实现效果: (1)让Kafka,RabbitMq支持消息重试。 (2)使用不同的消息中间件时,我们不需要在maven引入不同的Jar包。

    开发好后的使用效果

    对于不同的中间件,我们仅需在application.yaml中改一下配置信息

    // 如果想使用 rocketMq paas: mq: rocket: target: 106.14.13.60 // 如果想使用 rabbitMq paas: mq: kafka: target: 106.14.13.60

    发送消息(ProducerServer为我们自己的生产者)

    @Autowired private ProducerServer producerServer; ... producerServer.send("发送的信息");

    消费消息

    @Component public class OnsMessageListener implements MessageListener { @Override public String getTopic() { return "TOPIC"; } @Override public String getTag() { return "TAG"; } @Override public void process(ConsumeMessage message) { System.out.println("Listener正在监听:"+message.getValueAsString()); // 如果抛出异常,会进行重试一共重试16次:10s、30s、1min、2min …… 10min、20min、30min、1h、2h,最后放入死信队列 } }

    我们要考虑很多步骤要做

    适配不同消息队列重试机制,如何让Kafka,RabbitMq支持重试?如延时消息配合死信队列,定时任务…适配不同消息队列基本组件,如kafka有topic,但是无tag…适配不同消息队列的消费类型集群消费,广播消费…

    如果想实现这个效果,需要深入的了解这些中间件。这一章首先搭建一个项目框架。

    有以下步骤

    引入对应的依赖编写实现类编写配置文件读取类 主要注解是@ConfigruationProperties编写自动装配类在resources/META-INF/spring.factories 中配置我们的自动装配类maven install后引入依赖即可使用

    实战

    首先看下项目结构 mq-starter为我们要开发的sdk demo为引入mq-starter后的测试项目 开始开发吧~

    引入的依赖 <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> </dependencies>

    说明: 第一个依赖 主要是为编译器配置的 可以根据properties 鼠标右键 点到用这个属性的类上个 第二个依赖 主要是为了自动装配

    编写自己的功能实现类,此处为实现producerServer.send()方法 由于无论KafkaProducerServer还是RocketProducerServer,都是生产者,因此我们抽出一个接口 public interface ProducerServer { /** * 发送消息 * * @param msg 消息内容 * @return 消息Id */ String send(String msg); }

    定义两个实现类

    public class KafkaProducerServer implements ProducerServer { String target; public KafkaProducerServer(String target) { this.target = target; } @Override public String send(String msg) { System.out.println("kafka ip:" + target + " msg:" + msg); return null; } } public class RocketProducerServer implements ProducerServer { String target; public RocketProducerServer(String target) { this.target = target; } @Override public String send(String msg) { System.out.println("rocket ip:" + target + " msg:" + msg); return null; } } 编写配置文件读取类

    demo项目中application.yaml中定义了消息队列的配置(target ),我们要拿到demo中的配置信息,才可以进行发送

    @ConfigurationProperties(prefix = "paas.mq.kafka") @Data public class KafkaAutoConfigruationProperties { private String target; } @ConfigurationProperties(prefix = "paas.mq.rocket") @Data public class RocketAutoConfigruationProperties { private String target; }

    这里我们要读取的配置就是paas.mq.kafka.targer/paas.mq.rocket.targer的值 @ConfigurationProperties注解的作用就是读取配置文件指定属性的值

    编写自动装配类 @Configuration @EnableConfigurationProperties(KafkaAutoConfigruationProperties.class) @ConditionalOnClass(KafkaProducerServer.class) public class KafkaAutoConfigrution { @Autowired private KafkaAutoConfigruationProperties kafkaAutoConfigruationProperties; @ConditionalOnProperty("paas.mq.kafka.target") @Bean public KafkaProducerServer kafkaProducerServer() { return new KafkaProducerServer(kafkaAutoConfigruationProperties.getTarget()); } } @Configuration @EnableConfigurationProperties(RocketAutoConfigruationProperties.class) @ConditionalOnClass(RocketProducerServer.class) public class RocketAutoConfigrution { @Autowired private RocketAutoConfigruationProperties rocketAutoConfigruationProperties; @ConditionalOnProperty("paas.mq.rocket.target") @Bean public RocketProducerServer rocketProducerServer() { return new RocketProducerServer(rocketAutoConfigruationProperties.getTarget()); } }

    4.1.@Configuration 标识本类是配置类(相当于spring中application.xml)

    4.2.@EnableConfigurationProperties(AutoConfigruationProperties.class) 如果AutoConfigruationProperties中有注解@ConfigurationProperties 那么这个类就 会被加到spring上下文的容器中,也就是可以通过@Autowire来注入

    4.3.@ConditionalOnClass 当类路径下有指定类的情况下 才进行下一步

    4.4.@ConditionalOnMissingBean 当spring容器中没有这个Bean的时候才进行下一步

    4.5.@ConditionalOnProperty(“paas.mq.rocket.target”) 当配置文件存在“paas.mq.rocket.target”才进行下一步

    我们仅需一个bean,无需同时存在RocketProducerServer,KafkaProducerServer。因此抛弃@ConditionalOnMissingBean,而使用@ConditionalOnProperty(“paas.mq.rocket.target”),选择性的产生一个Bean对象。

    在resources/META-INF下添加spring.factories 指定自动装配的类也叫入口 内容如下: org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.wpp.paas.foundation.autoconfigure.mq.kafka.KafkaAutoConfigrution,\ com.wpp.paas.foundation.autoconfigure.mq.rocket.RocketAutoConfigrution 现在在demo项目中直接自动注入我们的bean就ok了
    Processed: 0.012, SQL: 9