RabbitMQ可靠消息队列实现
安装架构核心概念ServerConnectionExchangeBindingRouting keyQueue
SpringBoot整合SpringBoot版本maven谁来创建队列生产者ymlRabbitMqConfigMessageProducerConfirmCallbackReturnCallback队列消息信息SQL常量重新投递
消费者自动确认ymlMqConsumer
消费者手动动确认ymlMqConsumer
消费者确认机制测试ConfirmCallback测试ReturnCallback测试消费端重试机制测试
安装
基于Dokcer容器
架构
核心概念
Server
又称Broker, 接受客户端的连接,实现AMQP实体服务,这里指RabbitMQ 服务器
Connection
连接,应用程序与Broker的网络连接。
Channel:网络信道,几乎所有的操作都在 Channel 中进行,Channel是进行消息读写的通道。客户端可建立多个Channel:,每个Channel代表一个会话任务。Virtual host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
Exchange
交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种。
Direct Exchange:该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。 Topic Exchange:该类型的交换器将所有发送到Topic Exchange的消息被转发到所有RoutingKey中指定的Topic的队列上面。 Exchange将RoutingKey和某Topic进行模糊匹配,其中“*”用来匹配一个词,“#”用于匹配一个或者多个词。例如“com.#”能匹配到“com.rabbitmq.oa”和“com.rabbitmq”;而"com.*"只能匹配到“com.rabbitmq”。 Fanout Exchange:该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好。 Headers Exchange:该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用。
Binding
Exchange 和 Queue 之间的虚拟连接,binding 中可以包含 routing key。
Routing key
路由规则,虚拟机可用它来确定如何路由一个特定消息,即交换机绑定到 Queue 的键。
Queue
也称为Message Queue,消息队列,保存消息并将它们转发给消费者。
SpringBoot整合
SpringBoot版本
2.2.7.RELEASE
maven
<dependency
>
<groupId
>org
.springframework
.boot
</groupId
>
<artifactId
>spring
-boot
-starter
-amqp
</artifactId
>
</dependency
>
谁来创建队列
消息重要时,生产者和消费者都要创建队列。避免造成消息延迟或丢失。 消息不重要时,消费者创建队列。避免造成在没有消费者时,生产者发送的消息在MQ积压。
生产者
1.生产者生产消息后投递到队列之前,先将消息信息保存到数据库,状态为发送中。 2.队列将消息投递到Exchange,并触发ConfirmCallback回调。 3.判断ConfirmCallback回调返回的ack信息,为true时删除DB中对应的信息,为false时修改状态为发送失败。 4.Exchange通过匹配Routing key将消息转发到指定的队列中。 5.没有匹配的队列时通过ReturnCallback通知生产者,记录日志或保存到死信队列。 6.启动定时任务将数据库中ConfirmCallback失败的消息重新投递到Exchange,为防止死循环应该设置一个最大重试次数。
yml
spring
:
rabbitmq
:
addresses
: ***.***.***.***
port
: 5672
username
: admin
password
: ******
#开启 confirm 确认机制
publisher
-confirm
-type
: correlated
#开启
return 确认机制
publisher
-returns
: true
template
:
#设置为
true 后 消费者在消息没有被路由到合适队列情况下会被
return监听,而不会自动删除
mandatory
: true
RabbitMqConfig
@Configuration
public class RabbitMqConfig {
@Autowired
private MsgSendReturnCallBack msgSendReturnCallBack
;
@Autowired
private MsgSendConfirmCallBack msgSendConfirmCallBack
;
@Bean
public RabbitTemplate
rabbitTemplate(ConnectionFactory connectionFactory
) {
RabbitTemplate template
= new RabbitTemplate(connectionFactory
);
template
.setConfirmCallback(msgSendConfirmCallBack
);
template
.setReturnCallback(msgSendReturnCallBack
);
return template
;
}
}
MessageProducer
@Slf4j
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate
;
@Autowired
private MsgBroberService msgBroberService
;
public void send(String exchange
, String routingKey
, Object obj
) {
String uuid
= UUID.randomUUID().toString();
CorrelationData correlationId
= new CorrelationData(uuid
);
String msgJson
= JsonMapper
.INSTANCE.toJson(obj
);
Message message
= MessageBuilder
.withBody(msgJson
.getBytes()).build();
message
.getMessageProperties().setDeliveryMode(MessageDeliveryMode
.PERSISTENT);
message
.getMessageProperties().setContentType(MessageProperties
.CONTENT_TYPE_JSON);
MsgBrober msgBrober
= MsgBrober
.builder()
.corrId(uuid
)
.message(msgJson
)
.status(MsgBroberConstant
.SENDING_STATUS)
.exchange(exchange
)
.rountingKey(routingKey
)
.build();
msgBroberService
.save(msgBrober
);
try {
rabbitTemplate
.convertAndSend(exchange
, routingKey
, message
, correlationId
);
log
.info("消息正在发送。。。");
} catch (Exception e
) {
log
.error("消息发送失败, e={}", e
.getMessage());
msgBroberService
.update(Wrappers
.<MsgBrober
>lambdaUpdate()
.set(MsgBrober
::getReason
, e
.getMessage())
.set(MsgBrober
::getStatus
, MsgBroberConstant
.FAIL_STATUS)
.eq(MsgBrober
::getCorrId
, uuid
));
}
}
public void send(MsgBrober msgBrober
) {
CorrelationData correlationId
= new CorrelationData(msgBrober
.getCorrId());
Message message
= MessageBuilder
.withBody(msgBrober
.getMessage().getBytes()).build();
message
.getMessageProperties().setDeliveryMode(MessageDeliveryMode
.PERSISTENT);
message
.getMessageProperties().setContentType(MessageProperties
.CONTENT_TYPE_JSON);
rabbitTemplate
.convertAndSend(msgBrober
.getExchange(), msgBrober
.getRountingKey(),
message
, correlationId
);
}
}
ConfirmCallback
@Slf4j
@Component
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired
private MsgBroberService msgBroberService
;
@Override
public void confirm(CorrelationData correlationData
, boolean ack
, String cause
) {
if (ack
) {
log
.info("消息已成功投递,MsgSendConfirmCallBack, 回调id={}, ack={}, returnedMessage={}",correlationData
, ack
,
correlationData
.getReturnedMessage());
msgBroberService
.remove(Wrappers
.<MsgBrober
>lambdaQuery().eq(MsgBrober
::getCorrId
, correlationData
.getId()));
} else {
log
.error("MsgSendConfirmCallBack, 回调id={}, ack={}, returnedMessage={}", correlationData
, ack
,
correlationData
.getReturnedMessage());
msgBroberService
.update(Wrappers
.<MsgBrober
>lambdaUpdate()
.set(MsgBrober
::getStatus
, MsgBroberConstant
.FAIL_STATUS)
.set(MsgBrober
::getReason
, cause
)
.eq(MsgBrober
::getCorrId
, correlationData
.getId()));
}
}
}
ReturnCallback
@Slf4j
@Component
public class MsgSendReturnCallBack implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message
, int replyCode
, String replyText
, String exchange
, String routing
) {
log
.error("消息主题 message={}", message
);
log
.error("回复代码 reply={}", replyCode
);
log
.error("回复信息 replyText={}", replyText
);
log
.error("使用的 exchange={}", exchange
);
log
.error("路由 routing={}", routing
);
}
}
队列消息信息
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@
TableName(value
= "m_msg_broker")
public class MsgBrober {
@
TableId(type
= IdType
.AUTO)
private int id
;
private int retry
;
private int status
;
private String reason
;
private String corrId
;
private String message
;
private String exchange
;
private String rountingKey
;
private Date createTime
;
private Date updateTime
;
}
SQL
-- ----------------------------
-- Table structure
for m_msg_broker
-- ----------------------------
DROP TABLE IF EXISTS `m_msg_broker`;
CREATE TABLE `m_msg_broker` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`retry` tinyint(4) NOT NULL DEFAULT 0 COMMENT '尝试投递次数',
`status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '此时的消息的状态',
`reason` varchar(1000) CHARACTER SET utf8
COLLATE utf8_general_ci
NOT NULL DEFAULT '' COMMENT '失败原因',
`corr_id` varchar(64) CHARACTER SET utf8
COLLATE utf8_general_ci
NOT NULL DEFAULT '' COMMENT '消息唯一标识',
`message` varchar(10000) CHARACTER SET utf8
COLLATE utf8_general_ci
NOT NULL DEFAULT '' COMMENT '消息体',
`exchange` varchar(20) CHARACTER SET utf8
COLLATE utf8_general_ci
NOT NULL DEFAULT '' COMMENT '交换器',
`rounting_key` varchar(20) CHARACTER SET utf8
COLLATE utf8_general_ci
NOT NULL DEFAULT '' COMMENT '路由键',
`create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0),
`update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0),
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `msg_broker_corr_id_uindex`(`corr_id`) USING BTREE
) ENGINE = InnoDB
AUTO_INCREMENT = 1 CHARACTER SET = utf8
COLLATE = utf8_general_ci
COMMENT = 'rabbit消息' ROW_FORMAT = Dynamic
;
SET FOREIGN_KEY_CHECKS = 1;
常量
public class MsgBroberConstant {
public static final int
SENDING_STATUS = 0;
public static final int
SUCCESS_STATUS = 1;
public static final int
FAIL_STATUS = 2;
public static final int
NOT_FOUNT = -1;
public static final int
FIRST_RETRY = 1;
public static final int
MAX_RETRY = 3;
}
public class RabbitConstant {
public static final String
TEST_EXCHANGE = "test_exchange.direct";
public static final String
CARIN_QUEUE = "carin";
public static final String
CARIN_KEY = "carin_key";
}
重新投递
@Slf4j
@EnableScheduling
@Component
public class RetryMessageTasker {
@Autowired
private MsgBroberService msgBroberService
;
@Autowired
private MessageProducer messageProducer
;
@
Scheduled(initialDelay
= 5 * 1000L
, fixedDelay
= 10 * 1000L
)
public void retrySendFailMessage() {
List
<MsgBrober
> list
= msgBroberService
.list(
Wrappers
.<MsgBrober
>lambdaQuery()
.eq(MsgBrober
::getStatus
, MsgBroberConstant
.FAIL_STATUS)
.lt(MsgBrober
::getRetry
, MsgBroberConstant
.MAX_RETRY)
);
if (!CollectionUtils
.isEmpty(list
)) {
list
.stream().forEach(
msg
-> {
try {
messageProducer
.send(msg
);
log
.info("失败消息重发中。。。");
} catch (Exception e
) {
log
.error("消息重发失败");
msg
.setReason(e
.getMessage());
}
msg
.setRetry(msg
.getRetry() + 1);
msgBroberService
.updateById(msg
);
});
}
}
}
消费者自动确认
yml
spring
:
rabbitmq
:
addresses
: ***.***.***.***
port
: 5672
username
: admin
password
: *****
listener
:
simple
:
acknowledge
-mode
: auto #消费端自动确认
retry
:
enabled
: true #开启消费端重试机制
max
-attempts
: 4 #最大重试次数
,包含首次消费消息
initial
-interval
: 3000 #每次间隔时间
multiplier
: 2 #应用于前一重试间隔的乘法器。
default-requeue
-rejected
: true # 重试次数超过上面的设置之后是否丢弃(
false不丢弃时需要写相应代码将该消息加入死信队列)
MqConsumer
@Slf4j
@Component
public class MqConsumer {
@
RabbitListener(bindings
= @
QueueBinding(
exchange
= @
Exchange(name
= RabbitConstant
.TEST_EXCHANGE, type
= ExchangeTypes
.DIRECT,durable
= "true", autoDelete
= "false"),
key
= RabbitConstant
.TEST_KEY,
value
= @
Queue(name
= RabbitConstant
.TEST_QUEUE, durable
= "true", exclusive
= "false", autoDelete
= "false")
), concurrency
= "1-10")
public void test(@Payload String body
) {
log
.info("接收到消息,body={}", body
);
}
消费者手动动确认
yml
spring
:
rabbitmq
:
addresses
: ***.***.***.***
port
: 5672
username
: admin
password
: *****
listener
:
simple
:
acknowledge
-mode
: manual
MqConsumer
@
RabbitListener(bindings
= @
QueueBinding(
exchange
= @
Exchange(name
= RabbitConstant
.TEST_EXCHANGE, type
= ExchangeTypes
.DIRECT, durable
= "true", autoDelete
= "false"),
key
= RabbitConstant
.TEST_KEY,
value
= @
Queue(name
= RabbitConstant
.TEST_QUEUE, durable
= "true", exclusive
= "false", autoDelete
= "false")
), concurrency
= "1-10")
public void test(Channel channel
,@Payload String body
, @
Header(AmqpHeaders
.DELIVERY_TAG) long tag
,@
Header(AmqpHeaders
.REDELIVERED) boolean redelivered
) throws IOException
{
log
.info("接收到消息,body={}", body
);
try {
int i
= 10 / 0;
channel
.basicAck(tag
, true);
} catch (Exception e
) {
if (redelivered
) {
log
.error("消息已重复处理失败,拒绝再次接收,{}", body
);
channel
.basicReject(tag
, false);
} else {
log
.error("carIn message Error", e
);
channel
.basicNack(tag
, false, true);
}
}
}
消费者确认机制
通过acknowledge-mode进行消费确认配置,可选值有none、auto、manual。
none:MQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费到了消息。auto:消费者在接收到消息后进行业务处理未发生异常时,由spring-rabbit自动发送ack(无异常)到MQ,否则发送nack(异常)。manual:需要人为地根据业务处理的结果,获取到channel之后调用方法向MQ发送ack(或消费失败时的nack)信息。
有ack的模式下,需要考虑setDefaultRequeueRejected(false),否则当消费消息抛出异常没有catch住时,这条消息会被rabbitmq放回到queue头部,再被推送过来,然后再抛异常再放回…死循环了。设置false的作用是抛异常时不放回,而是直接丢弃,所以可能需要对这条消息做处理,以免丢失。
测试
ConfirmCallback测试
只启动生产者,此时MQ上不会创建Exchange 通过测试接口将消息发送到TEST_EXCHANGE = “test_exchange.direct”; 可以看到,第一次发送消息返回404 NOT_FOUND - no exchange ‘test_exchange.direct’ in vhost ‘/’ 间隔5s后进行了第一次重试,然后间隔10s又进行了两次重试。 查看数据库保存的错误信息,重试此数已经达到了3次,此时该消息不再尝试投递,避免故障未排除时程序产生大量消息一直重试造成资源消耗。 启动消费者后,修改重试次数为1,该消息重新发送投递。
ReturnCallback测试
启动消费者,修改生产者Exchange 的routingkey为其它值后重启。 通过测试可以发现,消息已经成功投递到了Exchange,但是MsgSendReturnCallBack返回了错误信息 将routingkey恢复后重启生产者,再次测试,两端都成功了。
消费端重试机制测试
@RabbitListener底层会使用AOP拦截,代码正常结束则会自动提交事务,但是如果有异常抛出,则会自动实现补偿,消息会一直缓存到消息服务器中,一直到不抛出异常。
需要注意的是,无论是手动重试还是自动重试,都应该指定重试次数,以免程序出现死循环,造成资源消耗。消费失败的消息会重新回到队列的头部,造成后面的消息积压。 消费者接收到消息后,手动制造一个异常 第一次消费失败后又重试了三次,达到了配置的最大重试次数后不再重试,直接丢弃。