序 我们知道不同中间件之间存在很多差异,如RocketMQ支持定时重试,每次重试间隔逐渐增加。 Kafka,RabbitMq不支持重试。我们要做一个统一的Spring Cloud Starter 实现效果: (1)让Kafka,RabbitMq支持消息重试。 (2)使用不同的消息中间件时,我们不需要在maven引入不同的Jar包。
开发好后的使用效果
对于不同的中间件,我们仅需在application.yaml中改一下配置信息
// 如果想使用 rocketMq paas: mq: rocket: target: 106.14.13.60 // 如果想使用 rabbitMq paas: mq: kafka: target: 106.14.13.60发送消息(ProducerServer为我们自己的生产者)
@Autowired private ProducerServer producerServer; ... producerServer.send("发送的信息");消费消息
@Component public class OnsMessageListener implements MessageListener { @Override public String getTopic() { return "TOPIC"; } @Override public String getTag() { return "TAG"; } @Override public void process(ConsumeMessage message) { System.out.println("Listener正在监听:"+message.getValueAsString()); // 如果抛出异常,会进行重试一共重试16次:10s、30s、1min、2min …… 10min、20min、30min、1h、2h,最后放入死信队列 } }我们要考虑很多步骤要做
适配不同消息队列重试机制,如何让Kafka,RabbitMq支持重试?如延时消息配合死信队列,定时任务…适配不同消息队列基本组件,如kafka有topic,但是无tag…适配不同消息队列的消费类型集群消费,广播消费…如果想实现这个效果,需要深入的了解这些中间件。这一章首先搭建一个项目框架。
有以下步骤
引入对应的依赖编写实现类编写配置文件读取类 主要注解是@ConfigruationProperties编写自动装配类在resources/META-INF/spring.factories 中配置我们的自动装配类maven install后引入依赖即可使用首先看下项目结构 mq-starter为我们要开发的sdk demo为引入mq-starter后的测试项目 开始开发吧~
引入的依赖 <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> </dependencies>说明: 第一个依赖 主要是为编译器配置的 可以根据properties 鼠标右键 点到用这个属性的类上个 第二个依赖 主要是为了自动装配
编写自己的功能实现类,此处为实现producerServer.send()方法 由于无论KafkaProducerServer还是RocketProducerServer,都是生产者,因此我们抽出一个接口 public interface ProducerServer { /** * 发送消息 * * @param msg 消息内容 * @return 消息Id */ String send(String msg); }定义两个实现类
public class KafkaProducerServer implements ProducerServer { String target; public KafkaProducerServer(String target) { this.target = target; } @Override public String send(String msg) { System.out.println("kafka ip:" + target + " msg:" + msg); return null; } } public class RocketProducerServer implements ProducerServer { String target; public RocketProducerServer(String target) { this.target = target; } @Override public String send(String msg) { System.out.println("rocket ip:" + target + " msg:" + msg); return null; } } 编写配置文件读取类demo项目中application.yaml中定义了消息队列的配置(target ),我们要拿到demo中的配置信息,才可以进行发送
@ConfigurationProperties(prefix = "paas.mq.kafka") @Data public class KafkaAutoConfigruationProperties { private String target; } @ConfigurationProperties(prefix = "paas.mq.rocket") @Data public class RocketAutoConfigruationProperties { private String target; }这里我们要读取的配置就是paas.mq.kafka.targer/paas.mq.rocket.targer的值 @ConfigurationProperties注解的作用就是读取配置文件指定属性的值
编写自动装配类 @Configuration @EnableConfigurationProperties(KafkaAutoConfigruationProperties.class) @ConditionalOnClass(KafkaProducerServer.class) public class KafkaAutoConfigrution { @Autowired private KafkaAutoConfigruationProperties kafkaAutoConfigruationProperties; @ConditionalOnProperty("paas.mq.kafka.target") @Bean public KafkaProducerServer kafkaProducerServer() { return new KafkaProducerServer(kafkaAutoConfigruationProperties.getTarget()); } } @Configuration @EnableConfigurationProperties(RocketAutoConfigruationProperties.class) @ConditionalOnClass(RocketProducerServer.class) public class RocketAutoConfigrution { @Autowired private RocketAutoConfigruationProperties rocketAutoConfigruationProperties; @ConditionalOnProperty("paas.mq.rocket.target") @Bean public RocketProducerServer rocketProducerServer() { return new RocketProducerServer(rocketAutoConfigruationProperties.getTarget()); } }4.1.@Configuration 标识本类是配置类(相当于spring中application.xml)
4.2.@EnableConfigurationProperties(AutoConfigruationProperties.class) 如果AutoConfigruationProperties中有注解@ConfigurationProperties 那么这个类就 会被加到spring上下文的容器中,也就是可以通过@Autowire来注入
4.3.@ConditionalOnClass 当类路径下有指定类的情况下 才进行下一步
4.4.@ConditionalOnMissingBean 当spring容器中没有这个Bean的时候才进行下一步
4.5.@ConditionalOnProperty(“paas.mq.rocket.target”) 当配置文件存在“paas.mq.rocket.target”才进行下一步
我们仅需一个bean,无需同时存在RocketProducerServer,KafkaProducerServer。因此抛弃@ConditionalOnMissingBean,而使用@ConditionalOnProperty(“paas.mq.rocket.target”),选择性的产生一个Bean对象。
在resources/META-INF下添加spring.factories 指定自动装配的类也叫入口 内容如下: org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.wpp.paas.foundation.autoconfigure.mq.kafka.KafkaAutoConfigrution,\ com.wpp.paas.foundation.autoconfigure.mq.rocket.RocketAutoConfigrution 现在在demo项目中直接自动注入我们的bean就ok了