Hyperf框架官方支持了Amqp,但是只是具备了基础发消息和接受消息。对于我们经常使用的延迟队列却不支持,这让人感到痛苦。
由于Rabbitmq默认没有支持延迟队列,需要使用官方的TTL和死信队列来实现我们的延迟队列功能.
1、rabbitmq 可以针对 Queue和Message 设置 x-message-ttl 来控制消息的生存时间,如果超时,消息变为 dead letter
2、rabbitmq 的queue 可以配置 x-dead-letter-exchange 和 x-dead-letter-routing(可选) 两个参数,来控制队列出现 dead letter 的时候,重新发送消息的目的地
1、设置了 x-dead-letter-exchange 和 x-dead-letter-routing 后的队列是根据队列入队的顺序进行消费,即使到了过期时间也不会触发x-dead-letter-exchange因为过期时间是在消息出队列的时候进行判断的
2、所以当队列没有设过期时间时,插入一个没有过期时间的消息会导致 x-dead-letter-exchange 队列永远不会被消费
通过看源码可以发现,Hyperf对Rabbitmq的官方SDK php-amqplib/php-amqplib 进行了封装。要实现延迟队列首先要了解清楚如果通过 php-amqplib/php-amqplib 实现延迟队列(参考下方 php-amqplib实现延迟队列)。
通过debug可以看到 hyperf 的producer仅仅是将消息推送至交换器就结束了。根据设计需要根据消息的过期时间建立对应的延迟queue 所以通过改造实现成下面这样:
declare(strict_types=1); namespace App\Constants\Amqp; use Hyperf\Amqp\Builder; use Hyperf\Di\Annotation\AnnotationCollector; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class DelayProducer extends Builder { public function produce(DelayProducerMessage $producerMessage, bool $confirm = false, int $timeout = 5, $delayTime = 0): bool { return retry(1, function () use ($producerMessage, $confirm, $timeout, $delayTime) { return $this->produceMessage($producerMessage, $confirm, $timeout, $delayTime); }); } /** * @param DelayProducerMessage $producerMessage * @param bool $confirm * @param int $timeout * @param int $delayTime * @return bool * @throws \Throwable */ private function produceMessage(DelayProducerMessage $producerMessage, bool $confirm = false, int $timeout = 5, int $delayTime = 0) { $result = false; $this->injectMessageProperty($producerMessage); if ($delayTime > 0) { $message = new AMQPMessage($producerMessage->payload(), array_merge($producerMessage->getProperties(), [ 'expiration' => $delayTime * 1000, ])); } else { $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties()); } $pool = $this->getConnectionPool($producerMessage->getPoolName()); /** @var \Hyperf\Amqp\Connection $connection */ $connection = $pool->get(); if ($confirm) { $channel = $connection->getConfirmChannel(); } else { $channel = $connection->getChannel(); } $channel->set_ack_handler(function () use (&$result) { $result = true; }); try { $delayExchange = 'delayed_' . $producerMessage->getExchange(); $delayQueue = 'delayed_queue_' . $producerMessage->getExchange() . $producerMessage->getTtl() . '_' . $delayTime; $delayRoutingKey = $producerMessage->getRoutingKey() . $delayTime; //定义延迟交换器 $channel->exchange_declare($delayExchange, 'topic', false, true, false); //定义延迟队列 $channel->queue_declare($delayQueue, false, true, false, false, false, new AMQPTable(array( "x-dead-letter-exchange" => $producerMessage->getExchange(), "x-dead-letter-routing-key" => $producerMessage->getRoutingKey(), "x-message-ttl" => $producerMessage->getTtl() * 1000, ))); //绑定延迟队列到交换器上 $channel->queue_bind($delayQueue, $delayExchange, $delayRoutingKey); $channel->basic_publish($message, $delayExchange, $delayRoutingKey); $channel->wait_for_pending_acks_returns($timeout); } catch (\Throwable $exception) { // Reconnect the connection before release. $connection->reconnect(); throw $exception; } finally { $connection->release(); } return $confirm ? $result : true; } private function injectMessageProperty(DelayProducerMessage $producerMessage) { if (class_exists(AnnotationCollector::class)) { /** @var DelayAnnotation $annotation */ $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), DelayAnnotation::class); if ($annotation) { $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey); $annotation->exchange && $producerMessage->setExchange($annotation->exchange); $annotation->ttl && $producerMessage->setTtl($annotation->ttl); } } } }细心的同学会发现injectMessageProperty这里多了个ttl, 这是利用了官方的注释机制。
这样改造的优势在于,不用改造消息消费者,官方原生的消费者全部都支持。
直接上代码
function publish() { $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); $channel = $connection->channel(); $exchange_name = 'test_exchange'; $queue_name = 'test_queue'; //定义默认的交换器 $channel->exchange_declare($exchange_name, 'topic', false, true, false); //定义延迟交换器 $channel->exchange_declare('delayed_exchange', 'topic', false, true, false); //定义延迟队列 $channel->queue_declare('delayed_queue', false, true, false, false, false, new AMQPTable(array( "x-dead-letter-exchange" => "delayed_exchange", "x-dead-letter-routing-key" => "delayed_exchange", "x-message-ttl" => 5000, //5秒延迟 ))); //绑定延迟队列到默认队列上 $channel->queue_bind('delayed_queue', $exchange_name); //定义正常消费队列 $channel->queue_declare($queue_name, false, true, false, false, false); //绑定正常消费队列到延迟交换器上 $channel->queue_bind($queue_name, 'delayed_exchange', 'delayed_exchange'); //发送消息 $message = new AMQPMessage('hello', array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $channel->basic_publish($message, $exchange_name); $channel->close(); $connection->close(); }