[Workerman]三、Workerman主动推送消息到客户端

    技术2023-10-07  111

    场景:

    数据通过http接口,发送到服务端,我们需要在http的接口,将发送过来的数据实时发送给客户端。

    方法一

    先下载一个PHPSocket.IO 测试代码下载:https://www.workerman.net/download

    新建一个run.php <?php use Workerman\Worker; use Workerman\Timer; use PHPSocketIO\SocketIO; use Workerman\Protocols\Http\Request; use Workerman\Connection\TcpConnection; include __DIR__ . '/../vendor/autoload.php'; // 全局数组保存uid在线数据 $uidConnectionMap = array(); // 记录最后一次广播的在线用户数 $last_online_count = 0; // 记录最后一次广播的在线页面数 $last_online_page_count = 0; // PHPSocketIO服务 $sender_io = new SocketIO(2120); // 客户端发起连接事件时,设置连接socket的各种事件回调 $sender_io->on('connection', function($socket){ // 当客户端发来登录事件时触发 $socket->on('login', function ($uid)use($socket){ global $uidConnectionMap, $last_online_count, $last_online_page_count; // 已经登录过了 if(isset($socket->uid)){ return; } // 更新对应uid的在线数据 $uid = (string)$uid; if(!isset($uidConnectionMap[$uid])) { $uidConnectionMap[$uid] = 0; } // 这个uid有++$uidConnectionMap[$uid]个socket连接 ++$uidConnectionMap[$uid]; // 将这个连接加入到uid分组,方便针对uid推送数据 $socket->join($uid); $socket->uid = $uid; // 更新这个socket对应页面的在线数据 // $socket->emit('update_online_count', "当前<b>{$last_online_count}</b>人在线,共打开<b>{$last_online_page_count}</b>个页面"); }); // 当客户端断开连接是触发(一般是关闭网页或者跳转刷新导致) $socket->on('disconnect', function () use($socket) { if(!isset($socket->uid)) { return; } global $uidConnectionMap, $sender_io; // 将uid的在线socket数减一 if(--$uidConnectionMap[$socket->uid] <= 0) { unset($uidConnectionMap[$socket->uid]); } }); }); // 当$sender_io启动后监听一个http端口,通过这个端口可以给任意uid或者所有uid推送数据 $sender_io->on('workerStart', function(){ // 监听一个http端口 $inner_http_worker = new Worker('http://0.0.0.0:2121'); // 当http客户端发来数据时触发 $inner_http_worker->onMessage = function(TcpConnection $http_connection, Request $request){ global $uidConnectionMap; $post = $request->post(); $post = $post ? $post : $request->get(); // 推送数据的url格式 type=publish&to=uid&content=xxxx switch(@$post['type']){ case 'publish': global $sender_io; $to = @$post['to']; $post['content'] = htmlspecialchars(@$post['content']); // 有指定uid则向uid所在socket组发送数据 if($to){ $sender_io->to($to)->emit('new_msg', $post['content']); // 否则向所有uid推送数据 }else{ $sender_io->emit('new_msg', @$post['content']); } // http接口返回,如果用户离线socket返回fail if($to && !isset($uidConnectionMap[$to])){ return $http_connection->send('offline'); }else{ return $http_connection->send('ok'); } } return $http_connection->send('fail'); }; // 执行监听 $inner_http_worker->listen(); // 一个定时器,定时向所有uid推送当前uid在线数及在线页面数 Timer::add(1, function(){ global $uidConnectionMap, $sender_io, $last_online_count, $last_online_page_count; $online_count_now = count($uidConnectionMap); $online_page_count_now = array_sum($uidConnectionMap); // 只有在客户端在线数变化了才广播,减少不必要的客户端通讯 if($last_online_count != $online_count_now || $last_online_page_count != $online_page_count_now) { // $sender_io->emit('update_online_count', "当前<b>{$online_count_now}</b>人在线,共打开<b>{$online_page_count_now}</b>个页面"); $last_online_count = $online_count_now; $last_online_page_count = $online_page_count_now; } }); }); if(!defined('GLOBAL_START')) { Worker::runAll(); }

    跑起来:

    php run.php 新建一个数据发送接口:send.php <?php // 指明给谁推送,为空表示向所有在线用户推送 $to_uid = "123"; // 推送的url地址 $push_api_url = "http://127.0.0.1:2121/"; $post_data = array( "type" => "publish", "content" => "数据", "to" => $to_uid, ); $ch = curl_init (); curl_setopt ( $ch, CURLOPT_URL, $push_api_url ); curl_setopt ( $ch, CURLOPT_POST, 1 ); curl_setopt ( $ch, CURLOPT_HEADER, 0 ); curl_setopt ( $ch, CURLOPT_RETURNTRANSFER, 1 ); curl_setopt ( $ch, CURLOPT_POSTFIELDS, $post_data ); curl_setopt ($ch, CURLOPT_HTTPHEADER, array("Expect:")); $return = curl_exec ( $ch ); curl_close ( $ch ); var_export($return); 新建一个显示页面 show.html <!DOCTYPE html> <html> <head> <meta http-equiv="content-type" content="text/html;charset=utf-8"> <script src='https://cdn.bootcss.com/socket.io/2.0.3/socket.io.js'></script> <!--<script src='./web/socket.io.js'></script>--> </head> <body> <div class="wrapper"> <script> // 连接服务端 var socket = io('http://127.0.0.1:2120'); // uid可以是自己网站的用户id,以便针对uid推送 uid = 123; // socket连接后以uid登录 socket.on('connect', function(){ socket.emit('login', uid); }); // 后端推送来消息时 socket.on('new_msg', function(msg){ console.log("收到消息:"+msg); //自己业务逻辑处理 }); </script> </div> </body> </html> 测试 打开 show.html,打开控制台。运行send.php。可以看到控制台有输出数据

    方法二

    <?php use Workerman\Worker; use Workerman\Lib\Timer; use think\cache\driver\Redis; include __DIR__ . '/../vendor/autoload.php'; ini_set('default_socket_timeout', -1); //不超时 $ws_worker = new Worker("websocket://0.0.0.0:2120"); // 启动4个进程对外提供服务 $ws_worker->count = 4; // 当收到客户端发来的数据后返回hello $data给客户端 $ws_worker->onMessage = function ($connection, $data) { // 向客户端发送hello $data // $connection->send('hello ' . $data); $connection->send('连接成功'); }; $marketId = 450502001; $channelId = 'F09F4687-B652-4533-B47A-C1F9087F2E15'; $redis = new Redis(); $redisKey = $marketId . ':' . $channelId . ':' . 'whiteList'; //$res = $redis->get($redisKey); $data = [ 'cmd' => 2, 'data' => [ 'market_id' => $marketId, 'channel_id' => $channelId, 'desc' => '', ] ]; // 进程启动后定时推送数据给客户端 $ws_worker->onWorkerStart = function ($ws_worker) use ($redis, $redisKey, $data) { Timer::add(1, function () use ($ws_worker, $redis, $redisKey, $data) { $vehicleNo = $redis->get($redisKey); // $vehicleNo = '桂A2333'; if (!empty($vehicleNo)) { $data['data']['vehicle_no'] = $vehicleNo; $sendMessage = json_encode($data); foreach ($ws_worker->connections as $connection) { $connection->send($sendMessage); $redis->set($redisKey, null); } } }); }; // 运行worker Worker::runAll();

    打开控制台,输入下面的js代码

    // 假设服务端ip为127.0.0.1 ws = new WebSocket("ws://127.0.0.1:2000"); ws.onopen = function() { alert("连接成功"); ws.send('tom'); alert("给服务端发送一个字符串:tom"); }; ws.onmessage = function(e) { alert("收到服务端的消息:" + e.data); };
    Processed: 0.014, SQL: 9