使用SpringBoot整合RabbitMQ

    技术2022-07-31  69

    使用Boot整合RabbitMQ

    步骤: 1.引入spring-boot-starter-amqp

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    2.application.properties配置

    spring.rabbitmq.addresses=localhost spring.rabbitmq.username=user spring.rabbitmq.password=user spring.rabbitmq.virtual-host=/user

    3.操作RabbitMQ

    AmqpAdmin:管理组件RabbitTemplate:消息发送处理组件@RabbitListener:监听消息

    整合收、发消息

    @Autowired注入RabbitTemplate,完成收发消息操作

    发消息 publisher

    代码示例:MessageService.clss

    @Service public class MessageService { @Autowired RabbitTemplate rabbitTemplate; //向exchange中发送 字符串 消息 public void send(String text){ //传三个参数 exchange、rountkey,message rabbitTemplate.convertAndSend("amq.direct", "notice", text); rabbitTemplate.convertAndSend("user.fanout.register", "vx", text); rabbitTemplate.convertAndSend("topic.test", "topic.#", text); } //向exchange中发送 对象 消息 public void sendVo(Book book){ //传三个参数 exchange、rountkey,message rabbitTemplate.convertAndSend("fanout.book", "book", book); } //向exchange中发送 Map,List 消息 public void sendMap(){ List<Book> list = new ArrayList<Book>(); list.add(new Book("数学之美", 45, "吴军")); list.add(new Book("剑指offer", 57, "何海涛")); Map<String, Object> map = new HashMap<String, Object>(); map.put("name","海绵宝宝"); map.put("list",list); map.put("Book", new Book("大话数据结构", 36, "程杰")); rabbitTemplate.convertAndSend("fanout.book", "book", list); rabbitTemplate.convertAndSend("fanout.book", "book", map); } //从队列中收到消息 public void receive(){ Object obj = rabbitTemplate.receiveAndConvert("direct.notice"); System.out.println("收到一条消息:"+obj); } }

    测试代码:Test.class

    @SpringBootTest class PublisherApplicationTests { @Autowired MessageService messageService; @Test void contextLoads() { messageService.send("rabbitiMQ的第一条测试"); Book book = new Book("数学之美",45, "吴军"); messageService.sendVo(book); messageService.sendMap(); messageService.receive(); } }

    整合监听消息

    使用@RabbitListener监听消息队列

    使用@EnableRabbit来启动监听

    收消息 consumer

    代码示例:ConsumerMessageService.class

    @Service public class ConsumerMessageService { @RabbitListener(queues = {"fanout.weixin", "fanout.sms", "direct.notice", "fanout.messages","topic.messages"}) public void receiveMessage(String string){ System.out.println("收到消息:"+string); } @RabbitListener(queues = "zzh.book") public void getBook(Book book){ System.out.println("从队列中收到一个book"+book); } @RabbitListener(queues = "zzh.book") public void getMap(Map map){ System.out.println("从队列中收到一个map:"+map); } } @SpringBootApplication @EnableRabbit public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }

    整合配置类 对象与Json格式的转换

    @Configuration public class MessageConvertorConfiguration { @Bean public MessageConverter jackson2JsonMeessageConverter(){ Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); return converter; } }

    AMQP的管理类

    使用AmqpAdmin组件对消息队列的管理

    示例代码:AdminService.class

    @Service public class AdminService { @Autowired AmqpAdmin amqpAdmin; //创建Exchange public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("direct.person")); amqpAdmin.declareExchange(new FanoutExchange("fanout.person")); amqpAdmin.declareExchange(new TopicExchange("topic.person")); } //创建Queue public void createQueue(){ amqpAdmin.declareQueue(new Queue("queue.person")); amqpAdmin.declareQueue(new Queue("queue.animal")); } //Exchange和Queue的绑定 public void createBindings(){ amqpAdmin.declareBinding(new Binding("queue.person",Binding.DestinationType.QUEUE,"direct.person","person", null)); } }

    测试代码:Test.class

    @SpringBootTest class PublisherApplicationTests { @Autowired AdminService adminService; @Test void contextLoads() { adminService.createExchange(); adminService.createQueue(); adminService.createBindings(); } }
    Processed: 0.028, SQL: 9