RabbitMQ可靠消息队列实现

    技术2025-12-11  24

    RabbitMQ可靠消息队列实现

    安装架构核心概念ServerConnectionExchangeBindingRouting keyQueue SpringBoot整合SpringBoot版本maven谁来创建队列生产者ymlRabbitMqConfigMessageProducerConfirmCallbackReturnCallback队列消息信息SQL常量重新投递 消费者自动确认ymlMqConsumer 消费者手动动确认ymlMqConsumer 消费者确认机制测试ConfirmCallback测试ReturnCallback测试消费端重试机制测试

    安装

    基于Dokcer容器

    架构

    核心概念

    Server

    又称Broker, 接受客户端的连接,实现AMQP实体服务,这里指RabbitMQ 服务器

    Connection

    连接,应用程序与Broker的网络连接。

    Channel:网络信道,几乎所有的操作都在 Channel 中进行,Channel是进行消息读写的通道。客户端可建立多个Channel:,每个Channel代表一个会话任务。Virtual host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。

    Exchange

    交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种。

    Direct Exchange:该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。 Topic Exchange:该类型的交换器将所有发送到Topic Exchange的消息被转发到所有RoutingKey中指定的Topic的队列上面。 Exchange将RoutingKey和某Topic进行模糊匹配,其中“*”用来匹配一个词,“#”用于匹配一个或者多个词。例如“com.#”能匹配到“com.rabbitmq.oa”和“com.rabbitmq”;而"com.*"只能匹配到“com.rabbitmq”。 Fanout Exchange:该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好。 Headers Exchange:该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用。

    Binding

    Exchange 和 Queue 之间的虚拟连接,binding 中可以包含 routing key。

    Routing key

    路由规则,虚拟机可用它来确定如何路由一个特定消息,即交换机绑定到 Queue 的键。

    Queue

    也称为Message Queue,消息队列,保存消息并将它们转发给消费者。

    SpringBoot整合

    SpringBoot版本

    2.2.7.RELEASE

    maven

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    谁来创建队列

    消息重要时,生产者和消费者都要创建队列。避免造成消息延迟或丢失。 消息不重要时,消费者创建队列。避免造成在没有消费者时,生产者发送的消息在MQ积压。

    生产者

    1.生产者生产消息后投递到队列之前,先将消息信息保存到数据库,状态为发送中。 2.队列将消息投递到Exchange,并触发ConfirmCallback回调。 3.判断ConfirmCallback回调返回的ack信息,为true时删除DB中对应的信息,为false时修改状态为发送失败。 4.Exchange通过匹配Routing key将消息转发到指定的队列中。 5.没有匹配的队列时通过ReturnCallback通知生产者,记录日志或保存到死信队列。 6.启动定时任务将数据库中ConfirmCallback失败的消息重新投递到Exchange,为防止死循环应该设置一个最大重试次数。

    yml

    spring: rabbitmq: addresses: ***.***.***.*** port: 5672 username: admin password: ****** #开启 confirm 确认机制 publisher-confirm-type: correlated #开启 return 确认机制 publisher-returns: true template: #设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除 mandatory: true

    RabbitMqConfig

    @Configuration public class RabbitMqConfig { @Autowired private MsgSendReturnCallBack msgSendReturnCallBack; @Autowired private MsgSendConfirmCallBack msgSendConfirmCallBack; /** * 消息确认,生产者→mq * * @param connectionFactory * @return */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setConfirmCallback(msgSendConfirmCallBack); template.setReturnCallback(msgSendReturnCallBack); return template; } }

    MessageProducer

    /** * @Author: World哥 * @Description: * @Date: Create in 15:42 2020/1/13 */ @Slf4j @Service public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MsgBroberService msgBroberService; /** * 发送消息 * * @param obj 消息对象 */ public void send(String exchange, String routingKey, Object obj) { String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); String msgJson = JsonMapper.INSTANCE.toJson(obj); Message message = MessageBuilder.withBody(msgJson.getBytes()).build(); // 消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // json数据 message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); // 消息落库 MsgBrober msgBrober = MsgBrober.builder() .corrId(uuid) .message(msgJson) .status(MsgBroberConstant.SENDING_STATUS) .exchange(exchange) .rountingKey(routingKey) .build(); msgBroberService.save(msgBrober); try { rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationId); log.info("消息正在发送。。。"); } catch (Exception e) { log.error("消息发送失败, e={}", e.getMessage()); msgBroberService.update(Wrappers.<MsgBrober>lambdaUpdate() .set(MsgBrober::getReason, e.getMessage()) .set(MsgBrober::getStatus, MsgBroberConstant.FAIL_STATUS) .eq(MsgBrober::getCorrId, uuid)); } } public void send(MsgBrober msgBrober) { CorrelationData correlationId = new CorrelationData(msgBrober.getCorrId()); Message message = MessageBuilder.withBody(msgBrober.getMessage().getBytes()).build(); // 消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // json数据 message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); rabbitTemplate.convertAndSend(msgBrober.getExchange(), msgBrober.getRountingKey(), message, correlationId); } }

    ConfirmCallback

    /** * @ClassName MsgSendConfirmCallBack * @Description 消息发送到交换机确认机制回调 * @Author World哥 * @Date 2019/8/5 15:49 * @Version 1.0 **/ @Slf4j @Component public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { @Autowired private MsgBroberService msgBroberService; @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息已成功投递,MsgSendConfirmCallBack, 回调id={}, ack={}, returnedMessage={}",correlationData, ack, correlationData.getReturnedMessage()); msgBroberService.remove(Wrappers.<MsgBrober>lambdaQuery().eq(MsgBrober::getCorrId, correlationData.getId())); } else { log.error("MsgSendConfirmCallBack, 回调id={}, ack={}, returnedMessage={}", correlationData, ack, correlationData.getReturnedMessage()); msgBroberService.update(Wrappers.<MsgBrober>lambdaUpdate() .set(MsgBrober::getStatus, MsgBroberConstant.FAIL_STATUS) .set(MsgBrober::getReason, cause) .eq(MsgBrober::getCorrId, correlationData.getId())); } } }

    ReturnCallback

    /** * @Author: World哥 * @Description:消息从交换器发送到对应队列失败时触发,指定的routingKey找不到队列时会触发 * @Date: Create in 16:03 2020/1/13 */ @Slf4j @Component public class MsgSendReturnCallBack implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routing) { log.error("消息主题 message={}", message); log.error("回复代码 reply={}", replyCode); log.error("回复信息 replyText={}", replyText); log.error("使用的 exchange={}", exchange); log.error("路由 routing={}", routing); } }

    队列消息信息

    @Data @Builder @NoArgsConstructor @AllArgsConstructor @TableName(value = "m_msg_broker") public class MsgBrober { @TableId(type = IdType.AUTO) private int id; private int retry; // 尝试投递次数 private int status; // 状态,0-消息正在发送;1-消息发送成功;2-消息发送失败 private String reason; // 失败原因 private String corrId; // 消息唯一标识 private String message;// 消息体 private String exchange;// 交换器 private String rountingKey; //路由键 private Date createTime; private Date updateTime; }

    SQL

    -- ---------------------------- -- Table structure for m_msg_broker -- ---------------------------- DROP TABLE IF EXISTS `m_msg_broker`; CREATE TABLE `m_msg_broker` ( `id` int(11) NOT NULL AUTO_INCREMENT, `retry` tinyint(4) NOT NULL DEFAULT 0 COMMENT '尝试投递次数', `status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '此时的消息的状态', `reason` varchar(1000) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '失败原因', `corr_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '消息唯一标识', `message` varchar(10000) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '消息体', `exchange` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '交换器', `rounting_key` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '路由键', `create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0), `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0), PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `msg_broker_corr_id_uindex`(`corr_id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'rabbit消息' ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;

    常量

    /** * @Author: World哥 * @Description: * @Date: Create in 15:08 2020/1/14 */ public class MsgBroberConstant { public static final int SENDING_STATUS = 0; //消息正在发送 public static final int SUCCESS_STATUS = 1; // 消息发送成功 public static final int FAIL_STATUS = 2; // 消息发送失败 public static final int NOT_FOUNT = -1; //数据不存在 public static final int FIRST_RETRY = 1; //第一次重试 public static final int MAX_RETRY = 3; //重试上限 } /** * @Author: World哥 * @Description: * @Date: Create in 10:50 2020/7/3 */ public class RabbitConstant { public static final String TEST_EXCHANGE = "test_exchange.direct"; /** * 车辆入场信息队列 */ public static final String CARIN_QUEUE = "carin"; public static final String CARIN_KEY = "carin_key"; }

    重新投递

    /** * @Author: World哥 * @Description: * @Date: Create in 16:00 2020/1/14 */ @Slf4j @EnableScheduling @Component public class RetryMessageTasker { @Autowired private MsgBroberService msgBroberService; @Autowired private MessageProducer messageProducer; @Scheduled(initialDelay = 5 * 1000L, fixedDelay = 10 * 1000L) public void retrySendFailMessage() { List<MsgBrober> list = msgBroberService.list( Wrappers.<MsgBrober>lambdaQuery() .eq(MsgBrober::getStatus, MsgBroberConstant.FAIL_STATUS) .lt(MsgBrober::getRetry, MsgBroberConstant.MAX_RETRY) ); if (!CollectionUtils.isEmpty(list)) { list.stream().forEach( msg -> { try { messageProducer.send(msg); log.info("失败消息重发中。。。"); } catch (Exception e) { log.error("消息重发失败"); msg.setReason(e.getMessage()); } // 更新发送次数 msg.setRetry(msg.getRetry() + 1); msgBroberService.updateById(msg); }); } } }

    消费者自动确认

    yml

    spring: rabbitmq: addresses: ***.***.***.*** port: 5672 username: admin password: ***** listener: simple: acknowledge-mode: auto #消费端自动确认 retry: enabled: true #开启消费端重试机制 max-attempts: 4 #最大重试次数,包含首次消费消息 initial-interval: 3000 #每次间隔时间 multiplier: 2 #应用于前一重试间隔的乘法器。 default-requeue-rejected: true # 重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)

    MqConsumer

    @Slf4j @Component public class MqConsumer { /** * QueueBinding将Queue绑定到Exchange,通过key匹配 * durable=true时设置EXCHANGE和QUEUE为持久的, * exclusive:将一个Queue声明成为排他性的, * ①只对首次声明它的连接(Connection)可见。 * ②会在其连接断开的时候自动删除。而不管这个队列是否被声明成持久性的(Durable =true)。 * 也就是说即使客户端程序将一个排他性的队列声明成了Durable的,只要调用了连接的Close方法或者客户端程序退出了, * RabbitMQ都会删除这个队列。注意这里是连接断开的时候,而不是通道断开。这个其实前一点保持一致,只区别连接而非通道。 * autoDelete订阅该队列的消费者下线后,该队列会自动删除 * @param body */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = RabbitConstant.TEST_EXCHANGE, type = ExchangeTypes.DIRECT,durable = "true", autoDelete = "false"), key = RabbitConstant.TEST_KEY, value = @Queue(name = RabbitConstant.TEST_QUEUE, durable = "true", exclusive = "false", autoDelete = "false") ), concurrency = "1-10") public void test(@Payload String body) { log.info("接收到消息,body={}", body); }

    消费者手动动确认

    yml

    spring: rabbitmq: addresses: ***.***.***.*** port: 5672 username: admin password: ***** listener: simple: acknowledge-mode: manual

    MqConsumer

    @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = RabbitConstant.TEST_EXCHANGE, type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"), key = RabbitConstant.TEST_KEY, value = @Queue(name = RabbitConstant.TEST_QUEUE, durable = "true", exclusive = "false", autoDelete = "false") ), concurrency = "1-10") public void test(Channel channel,@Payload String body, @Header(AmqpHeaders.DELIVERY_TAG) long tag,@Header(AmqpHeaders.REDELIVERED) boolean redelivered ) throws IOException { log.info("接收到消息,body={}", body); try { int i = 10 / 0; /** * 无异常就确认消息 * basicAck(long deliveryTag, boolean multiple) * deliveryTag:取出来当前消息在队列中的的索引; * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认 * deliveryTag为5及其以下的消息;一般设置为false */ channel.basicAck(tag, true); } catch (Exception e) { if (redelivered) { log.error("消息已重复处理失败,拒绝再次接收,{}", body); // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列 channel.basicReject(tag, false); } else { log.error("carIn message Error", e); /** * 有异常就绝收消息 * basicNack(long deliveryTag, boolean multiple, boolean requeue) * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者; * false:将消息丢弃 */ channel.basicNack(tag, false, true); } } }

    消费者确认机制

    通过acknowledge-mode进行消费确认配置,可选值有none、auto、manual。

    none:MQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费到了消息。auto:消费者在接收到消息后进行业务处理未发生异常时,由spring-rabbit自动发送ack(无异常)到MQ,否则发送nack(异常)。manual:需要人为地根据业务处理的结果,获取到channel之后调用方法向MQ发送ack(或消费失败时的nack)信息。

    有ack的模式下,需要考虑setDefaultRequeueRejected(false),否则当消费消息抛出异常没有catch住时,这条消息会被rabbitmq放回到queue头部,再被推送过来,然后再抛异常再放回…死循环了。设置false的作用是抛异常时不放回,而是直接丢弃,所以可能需要对这条消息做处理,以免丢失。

    测试

    ConfirmCallback测试

    只启动生产者,此时MQ上不会创建Exchange 通过测试接口将消息发送到TEST_EXCHANGE = “test_exchange.direct”; 可以看到,第一次发送消息返回404 NOT_FOUND - no exchange ‘test_exchange.direct’ in vhost ‘/’ 间隔5s后进行了第一次重试,然后间隔10s又进行了两次重试。 查看数据库保存的错误信息,重试此数已经达到了3次,此时该消息不再尝试投递,避免故障未排除时程序产生大量消息一直重试造成资源消耗。 启动消费者后,修改重试次数为1,该消息重新发送投递。

    ReturnCallback测试

    启动消费者,修改生产者Exchange 的routingkey为其它值后重启。 通过测试可以发现,消息已经成功投递到了Exchange,但是MsgSendReturnCallBack返回了错误信息 将routingkey恢复后重启生产者,再次测试,两端都成功了。

    消费端重试机制测试

    @RabbitListener底层会使用AOP拦截,代码正常结束则会自动提交事务,但是如果有异常抛出,则会自动实现补偿,消息会一直缓存到消息服务器中,一直到不抛出异常。

    需要注意的是,无论是手动重试还是自动重试,都应该指定重试次数,以免程序出现死循环,造成资源消耗。消费失败的消息会重新回到队列的头部,造成后面的消息积压。 消费者接收到消息后,手动制造一个异常 第一次消费失败后又重试了三次,达到了配置的最大重试次数后不再重试,直接丢弃。

    Processed: 0.032, SQL: 11