步骤: 1.引入spring-boot-starter-amqp
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.application.properties配置
spring.rabbitmq.addresses=localhost spring.rabbitmq.username=user spring.rabbitmq.password=user spring.rabbitmq.virtual-host=/user3.操作RabbitMQ
AmqpAdmin:管理组件RabbitTemplate:消息发送处理组件@RabbitListener:监听消息@Autowired注入RabbitTemplate,完成收发消息操作
发消息 publisher
代码示例:MessageService.clss
@Service public class MessageService { @Autowired RabbitTemplate rabbitTemplate; //向exchange中发送 字符串 消息 public void send(String text){ //传三个参数 exchange、rountkey,message rabbitTemplate.convertAndSend("amq.direct", "notice", text); rabbitTemplate.convertAndSend("user.fanout.register", "vx", text); rabbitTemplate.convertAndSend("topic.test", "topic.#", text); } //向exchange中发送 对象 消息 public void sendVo(Book book){ //传三个参数 exchange、rountkey,message rabbitTemplate.convertAndSend("fanout.book", "book", book); } //向exchange中发送 Map,List 消息 public void sendMap(){ List<Book> list = new ArrayList<Book>(); list.add(new Book("数学之美", 45, "吴军")); list.add(new Book("剑指offer", 57, "何海涛")); Map<String, Object> map = new HashMap<String, Object>(); map.put("name","海绵宝宝"); map.put("list",list); map.put("Book", new Book("大话数据结构", 36, "程杰")); rabbitTemplate.convertAndSend("fanout.book", "book", list); rabbitTemplate.convertAndSend("fanout.book", "book", map); } //从队列中收到消息 public void receive(){ Object obj = rabbitTemplate.receiveAndConvert("direct.notice"); System.out.println("收到一条消息:"+obj); } }测试代码:Test.class
@SpringBootTest class PublisherApplicationTests { @Autowired MessageService messageService; @Test void contextLoads() { messageService.send("rabbitiMQ的第一条测试"); Book book = new Book("数学之美",45, "吴军"); messageService.sendVo(book); messageService.sendMap(); messageService.receive(); } }使用@RabbitListener监听消息队列
使用@EnableRabbit来启动监听
收消息 consumer
代码示例:ConsumerMessageService.class
@Service public class ConsumerMessageService { @RabbitListener(queues = {"fanout.weixin", "fanout.sms", "direct.notice", "fanout.messages","topic.messages"}) public void receiveMessage(String string){ System.out.println("收到消息:"+string); } @RabbitListener(queues = "zzh.book") public void getBook(Book book){ System.out.println("从队列中收到一个book"+book); } @RabbitListener(queues = "zzh.book") public void getMap(Map map){ System.out.println("从队列中收到一个map:"+map); } } @SpringBootApplication @EnableRabbit public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }整合配置类 对象与Json格式的转换
@Configuration public class MessageConvertorConfiguration { @Bean public MessageConverter jackson2JsonMeessageConverter(){ Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); return converter; } }使用AmqpAdmin组件对消息队列的管理
示例代码:AdminService.class
@Service public class AdminService { @Autowired AmqpAdmin amqpAdmin; //创建Exchange public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("direct.person")); amqpAdmin.declareExchange(new FanoutExchange("fanout.person")); amqpAdmin.declareExchange(new TopicExchange("topic.person")); } //创建Queue public void createQueue(){ amqpAdmin.declareQueue(new Queue("queue.person")); amqpAdmin.declareQueue(new Queue("queue.animal")); } //Exchange和Queue的绑定 public void createBindings(){ amqpAdmin.declareBinding(new Binding("queue.person",Binding.DestinationType.QUEUE,"direct.person","person", null)); } }测试代码:Test.class
@SpringBootTest class PublisherApplicationTests { @Autowired AdminService adminService; @Test void contextLoads() { adminService.createExchange(); adminService.createQueue(); adminService.createBindings(); } }