Spring Boot + RabbitMQ demo写法

    技术2022-07-11  78

    application.properties配置文件写法

    #rabbitmq spring.rabbitmq.virtual-host=/ spring.rabbitmq.host=192.168.124.20 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.listener.concurrency=10 spring.rabbitmq.listener.max-concurrency=20 spring.rabbitmq.listener.prefetch=5 yy.system.log.queue.name=yy.system.log.queue.name yy.system.log.exchange.name=yy.system.log.exchange.name yy.system.log.routing.key.name=yy.system.log.routing.key.name RabbitmqConfig.class配置文件 import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import java.util.HashMap; import java.util.Map; /** * Created by steadyjack on 2018/8/20. */ public class RabbitmqConfig { private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class); @Autowired private Environment env; @Autowired private CachingConnectionFactory connectionFactory; @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; /** * 单一消费者 * @return */ @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setTxSize(1); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); return factory; } /** * 多个消费者 * @return */ @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory,connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.NONE); factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class)); factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class)); factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class)); return factory; } @Bean public RabbitTemplate rabbitTemplate(){ connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } }); return rabbitTemplate; } //TODO:系统日志消息模型 @Bean(name = "logSystemQueue") public Queue logSystemQueue(){ return new Queue(env.getProperty("yy.system.log.queue.name"),true); } @Bean public TopicExchange logSystemExchange(){ return new TopicExchange(env.getProperty("yy.system.log.exchange.name"),true,false); } @Bean public Binding logSystemBinding(){ return BindingBuilder.bind(logSystemQueue()).to(logSystemExchange()).with(env.getProperty("yy.system.log.routing.key.name")); } }

    控制文件用法

    //TODO:异步写用户日志 try { YySystemLog yySystemLog=new YySystemLog(); yySystemLog.setCtime(TimeUtil.currenttime("yyyy-MM-dd HH:mm:ss")); yySystemLog.setHreflink("/app/login"); yySystemLog.setModular("登录模块"); yySystemLog.setOperation("用户登录"); yySystemLog.setUsername(finduser.getUsername()); yySystemLog.setUserid(finduser.getId()+""); yySystemLog.setOperip(TimeUtil.getIpAddr(request)); yySystemLog.setRemark("用户登录成功"); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(env.getProperty("yy.system.log.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("yy.system.log.routing.key.name")); Message message= MessageBuilder.withBody(objectMapper.writeValueAsBytes(yySystemLog)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build(); message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON); //发送消息写法二 rabbitTemplate.convertAndSend(message); }catch (Exception e){ e.printStackTrace(); }

    启动文件添加 

    @Configuration
    Processed: 0.009, SQL: 9