13.task进程

    技术2022-07-11  89

    task进程

    swoole 结构回顾task进程介绍task初体验实例task-ipc-mode的消息队列通信模式task问题task任务切分

    swoole 结构回顾

    task进程介绍

    Task进程是独立与worker进程的一个进程.他主要处理耗时较长的业务逻辑.并且不影响worker进程处理客户端的请求,这大大提高了swoole的并发能力当有耗时较长的任务时,worker进程通过task()函数把数据投递到Task进程 去处理

    适合运用场景

    情景一:管理员需要给100W用户发送邮件,当点击发送,浏览器会一直转圈,直到邮件全部发送完毕。情景二:千万微博大V发送一条微博,其关注的粉丝相应的会接收到这个消息,是不是大V需要一直等待消息发送完成,才能执行其它操作情景三:处理几TB数据情景四: 数据库插入

    task初体验实例

    task.php

    $serv = new Swoole\Server("127.0.0.1", 9501, SWOOLE_BASE); $serv->set(array('worker_num' => 2, 'task_worker_num' => 4,)); $serv->on('Receive', function(Swoole\Server $serv, $fd, $from_id, $data) { echo "接收数据" . $data . "\n"; $data = trim($data); $task_id = $serv->task($data, 0); $serv->send($fd, "分发任务,任务id为$task_id\n"); }); $serv->on('Task', function (Swoole\Server $serv, $task_id, $from_id, $data) { echo "Tasker进程接收到数据"; echo "#{$serv->worker_id}\tonTask: [PID={$serv->worker_pid}]: task_id=$task_id, data_len=" . strlen($data) . "." . PHP_EOL; $serv->finish($data); }); $serv->on('Finish', function (Swoole\Server $serv, $task_id, $data) { echo "Task#$task_id finished, data_len=" . strlen($data) . PHP_EOL; }); $serv->on('workerStart', function($serv, $worker_id) { global $argv; if ($worker_id >= $serv->setting['worker_num']) { swoole_set_process_name("php {$argv[0]}: task_worker"); } else { swoole_set_process_name("php {$argv[0]}: worker"); } }); $serv->start();

    task-ipc-mode的消息队列通信模式

    使用unix socket通信,默认模式使用消息队列通信使用消息队列通信,并设置为争抢模式

    Task传递数据的大小问题

    数据小于8k直接通过管道传递,数据大于8k写入临时文件传递onTask会读取这个文件,把他读出来 因为没有消费完成会遗留,不过这只是会在意外情况下产生的;注意swoole及时重启了也不会消费;这是在模式1的情况下产生的; 而模式2:会使用系统的消息队列通信;不过要注意它还是会存在可能的临时文件,重点是重启之后是否会去处理;当然实际上目前的设置之后还是不会处理的,还需要一个额外的参数来指定 message_queue_key: https://wiki.swoole.com/wiki/page/346.html 通过这个参数来进行设置,如下为设置方式:

    $key = ftok(__DIR__, 1); $serv->set([ 'worker_num' => 2, 'task_worker_num' => 4, 'task_ipc_mode' => 2, 'message_queue_key' => $key] );

    task问题

    如果投递的任务量总是大于task进程的处理能力,建议适当的调大task_worker_num的数量,增加task进程数,不然一旦task塞满缓冲区,就会导致worker进程阻塞,所以需要使用好task前期必须有所规划

    task对worker的影响

    关于task_worker的个数问题:

    除此之外还需要注意的是task可能会影响到worker的工作性能,我们可以看官方的中所提到的

    swoole_task_worker.php

    <?php $host = "0.0.0.0:9501\n"; $serv = new Swoole\Server("0.0.0.0", 9501, SWOOLE_BASE); $key = ftok(__DIR__, 1); $serv->set(array('worker_num' => 2, 'task_worker_num' => 4, 'task_ipc_mode' => 2, 'message_queue_key' => $key, // 处理tcp打包问题 'open_length_check' => true, 'package_max_length' => 1024 * 1024 * 3, 'package_length_type' => 'N', 'package_length_offset' => 0, 'package_body_offset' => 4,)); $serv->on('Receive', function(Swoole\Server $serv, $fd, $from_id, $data) { $task_id = $serv->task($data); echo "异步事情\n"; $serv->send($fd, "分发任务,任务id为$task_id\n"); }); $serv->on('Task', function (Swoole\Server $serv, $task_id, $from_id, $data) { $serv->finish($data); }); $serv->on('Finish', function (Swoole\Server $serv, $task_id, $data) { }); echo $host; $serv->start();

    swoole_tcp_client.php

    <?php $client = new swoole_client(SWOOLE_SOCK_TCP); //连接到服务器 $client->connect('127.0.0.1', 9501, 0.5); //向服务器发送数据 $body = 'a'; $send = pack('N', strlen($body)) . $body; for ($i = 0; $i < 100; $i++) { $client->send($send); }//从服务器接收数据 $data = $client->recv(); echo $data . "\n"; //关闭连接 $client->close(); echo "其他事情\n";

    其余问题

    task_max_request 设置task进程的最大任务数。一个task进程在处理完超过此数值的任务后将自动退出。这个参数是为了防止PHP进程内存溢出。如果不希望进程自动退出可以设置为0每个woker都有可能投递任务给不同的task_worker处理, 不同的woker进程内存隔离,记录着worker_id, 标识woker进程任务处理数量

    task任务切分

    模拟信息读取短息发送 2: 从200w的MySQL中读取数据,获取到手机号码然后对于用户进行短息发送,通知xxxx事情,然后为了与友好根据性别进行判断‘先生’,‘女士’ 首先第一件事情就是先导入测试的数据与设置MySQL的外部访问的权限-》为了方便导入数据

    <?php require 'db.php'; $host = "0.0.0.0:9501\n"; $serv = new Swoole\Server("0.0.0.0", 9501, SWOOLE_BASE); $key = ftok(__DIR__, 1); $serv->set( array( 'worker_num' => 2, 'task_worker_num' => 4, 'task_ipc_mode' => 2, 'message_queue_key' => $key, // 处理tcp打包问题 'open_length_check' => true, 'package_max_length' => 1024 * 1024 * 3, 'package_length_type' => 'N', 'package_length_offset' => 0, 'package_body_offset' => 4 ) ); $serv->on('Receive', function(Swoole\Server $serv, $fd, $from_id, $data) use ($db) { for ($i = 0; $i < 4; $i++) { $data = $db->query('select id,gender,name,mobile from customers where id > ' . ($i * 500000) . ' and id <= ' . (($i + 1) * 500000)); $task_id = $serv->task($data, $i); unset($data); }$serv->send($fd, "分发任务\n"); }); $serv->on('Task', function (Swoole\Server $serv, $task_id, $from_id, $data) { echo "处理: " . $from_id . " 的任务,任务id为" . $task_id . "数据信息量" . count($data) . "\n"; echo "当前处理进程:" . posix_getpid() . "\n"; // 就当做发送短信 file_put_contents(__DIR__ . '/task_' . $task_id . '.log', ''); foreach ($data as $key => $value) { try { if ($value['gender'] == 0) { // 女士 $string = "尊敬的手机号为" . $value['mobile'] . "的" . $value['name'] . "_女士_你好,然后xxxxxx省....\n"; } else { // 先生 $string = "尊敬的手机号为" . $value['mobile'] . "的" . $value['name'] . "_先生_你好,然后xxxxxx省....\n"; } // 不打印就记录 -- 因为信息太多 file_put_contents(__DIR__ . '/task_' . $task_id . '.log', $string, 8); } catch (\Exception $e) { // $serv->sendMessage($value, 1); } }$serv->finish($data); }); $serv->on('PipeMessage', function (Swoole\Server $server, $src_worker_id, $message) { echo "来自" . $src_worker_id . "的信息\n"; var_dump($message); }); echo $host; $serv->start();
    Processed: 0.013, SQL: 9