TP使用RabbitMq队列

    技术2022-07-17  74

    TP使用RabbitMq队列

    最近学习rabbitmq,看了很多文章,写的都太乱,转载了一个比较不错的 我是windows10所以需要先安装Erlang,然后再安装RabbitMq服务端,TP根目录使用composer安装扩展

    composer require php-amqplib/php-amqplib <?php //接收端 需要守护进程运行 use PhpAmqpLib\Connection\AMQPStreamConnection; class MessageConsume { protected $consumerTag = 'consumer'; protected $exchange = 'router'; protected $queue = 'msgs'; /* *关闭 */ function shutdown($channel, $connection) { $channel->close(); $connection->close(); } /* *回调处理信息 */ function process_message($message) { if ($message->body !== 'quit') { echo $message->body; } //手动应答 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); if ($message->body === 'quit') { $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); } } /** * 启动 守护进程运行 * 在public目录运行php index.php index/message_onsume/start & */ public function start() { $host = '127.0.0.1'; $port = 5672; $user = 'guest'; $pwd = 'guest'; $vhost = '/'; $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost); $channel = $connection->channel(); $channel->queue_declare($this->queue, false, true, false, false); $channel->exchange_declare($this->exchange, 'direct', false, true, false); $channel->queue_bind($this->queue, $this->exchange); $channel->basic_consume($this->queue, $this->consumerTag, false, false, false, false, array($this, 'process_message')); register_shutdown_function(array($this, 'shutdown'), $channel, $connection); while (count($channel->callbacks)) { $channel->wait(); } } } <?php //发送端 use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class MessageQueue { const exchange = 'router'; const queue = 'msgs'; /** * 发送信息 */ public static function pushMessage($data) { $host = '127.0.0.1'; $port = 5672; $user = 'guest'; $pwd = 'guest'; $vhost = '/'; $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost); $channel = $connection->channel(); $channel->queue_declare(self::queue, false, true, false, false); $channel->exchange_declare(self::exchange, 'direct', false, true, false); $channel->queue_bind(self::queue, self::exchange); $messageBody = $data; $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $channel->basic_publish($message, self::exchange); $channel->close(); $connection->close(); return "ok"; } /** * 调用pushMessage就可以发送消息 */ public function index() { $data = json_encode(['msg' => '测试数据', 'id' => '15']); self::pushMessage($data); } }

    链接: Rabbitmq各方法的作用详解.

    Processed: 0.028, SQL: 10