.我们为什么要事件总线,对于一个大型的项目这点是很重要的,以为服务之间用MQ的地方很多,每次都要写重复的代码,导致代码的可读性差,而且加大了代码的复杂性。对其进行封装后,这些都可以解决。那么我们开始进行封装吧 2. 首先我们需要spring_boot整合的rabbit MQ包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 进行封装(架构师都这样干~)我们定义一些常量,这点很重要,因为后面我们监听队列的时候只需要用到我们定义的常量就行了 public interface EventConstact { /** * 创建交换机的名称 */ String EXCHANGE_NAME="event-exchange"; /** * 事件类型常量 */ String EVENT_HOTEL_INSERT="hotel_insert"; }按理来说你的事件是可以无限增加的,业务需求也是,所以你可以定义很多事件类型
大家都知道,其实MQ里面是有发布消息的发布者和事件类型的监听者这么两个角色的,现在我们有一个问题把,消息队列是由谁创建?交换机是谁创建?以及绑定交换机路由键等问题。这里就涉及到一个先后创建交换机和路由键绑定等问题。我是这么来解决的,交换机的话发布者和事件监听者都进行创建,谁先启动谁创建,因为交换机名称我已经在前面的接口中定死了,多次创建也只是覆盖不会出现运行报错,路由键与交换机和队列的绑定,我用每个服务的appliction.name来做队列名,然后路由键的名字的话,我们可以事先在前面的接口中定义好。这样就可以实现这三者的绑定了。下面是我代码的实现。创建交换机
import com.qf.event.constact.EventConstact; import org.springframework.amqp.core.DirectExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //创建交换机 @Configuration public class EventPublishConfiguration { /** *事件发布者需要创建 * 事件接收者也需要创建 * * 创建一个交换机 */ @Bean public DirectExchange getExchange(){ return new DirectExchange(EventConstact.EXCHANGE_NAME,true,false); } }创建队列。以及交换机,路由键的绑定
@Configuration @ConditionalOnBean(EventListener.class) public class EventConsumerConfiguration { //保证每种服务的name名一样 @Value("${spring.application.name}") public String name; //获得所有EventListener对象 @Autowired public List<EventListener> eventListeners; @Autowired SpringContextUtil springContextUtil; /** * 创建队列(消费者来创建) */ @Bean public Queue getQueue(){ return new Queue(name+"-queue",true,false,false); } /** * 队列和交换机的绑定 * @param getQueue 默认是方法名 * @param getExchange 默认是方法名 * @return */ @Bean public Binding getBinding(Queue getQueue, DirectExchange getExchange){ //循环所有的EventListener实现类 for (EventListener eventListener : eventListeners) { //获得当前处理器需要处理的事件类型-路由键 String eventType = eventListener.gteEventType(); System.out.println("绑定的路由键:"+eventType); //队列通过路由键绑定交换机 Binding binding= BindingBuilder.bind(getQueue).to(getExchange).with(eventType); //动态将binding对象注入spring容器中 springContextUtil.registerBean(eventType+eventListener.hashCode(),binding); } return null; } }手动把binding对象注册到springioc容器中
import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; import org.springframework.stereotype.Component; import java.util.function.Supplier; /** * Spring -> IOC -> Bean工厂 * Map<beanName, BeanDefintion> * * 手动将Bean注册到IOC容器中 * * Bean -> BeanDefition * */ @Component public class SpringContextUtil implements BeanDefinitionRegistryPostProcessor { //注册bean的核心对象 private BeanDefinitionRegistry beanDefinitionRegistry; /** * 自定义工具方法 - 注册Bean对象 */ public void registerBean(String beanName, Object bean){ //将bean封装成BeanDefinition对象 BeanDefinitionBuilder beanDefition = BeanDefinitionBuilder.genericBeanDefinition(bean.getClass(), new Supplier() { @Override public Object get() { return bean; } }); //将BeanDefintion注册到Spring容器中 beanDefinitionRegistry.registerBeanDefinition(beanName, beanDefition.getBeanDefinition()); } @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { this.beanDefinitionRegistry = registry; } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { } }发布事件
import com.qf.event.constact.EventConstact; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; //发布者把事件广播出去 @Component public class EventUtil { @Autowired private RabbitTemplate rabbitTemplate; public void publishEvent(String evenType,Object msg){ //第一个参数是交换机的名称,第二是事件名称(类型),第三是事件内容 System.out.println("广播了一个事件:"+evenType); rabbitTemplate.convertAndSend(EventConstact.EXCHANGE_NAME,evenType,msg); } }下面就是核心方法了,监听方法
//Rabbitmq监听器 @Component @ConditionalOnBean(EventListener.class) public class RabbitMQListener { @Autowired private List<EventListener> eventListeners; /** * 监听指定队列 */ @RabbitListener(queues = "${spring.application.name}-queue") public void msgHandler(Message message){ //获得发布消息的路由键 - 事件类型 String routingKey = message.getMessageProperties().getReceivedRoutingKey(); //交给处EvenListener理器处理 for (EventListener eventListener : eventListeners) { //判断事件类型是否匹配 if (eventListener.gteEventType().equals(routingKey)){ //获得队列中的消息 byte[] body = message.getBody(); //反序列化后用EventListener对象中的evenHandler方法来处理该消息 eventListener.eventHandler(SerializationUtils.deserialize(body)); } } } }监听者实现这个方法就可以得到发布的事件了
/** * 接收者必须实现 */ public interface EventListener<T> { /** * 监听的事件类型 */ String gteEventType(); /** * 事件处理的方法 */ void eventHandler(T msg); }这里就完成了对MQ事件总线的封装了,喜欢的可以点个赞。