在PHP的yii2框架中,有时候需要一些异步队列的功能,这篇文章主要写如何自己增加几个文件创建一个基于redis list的异步队列进程。 在yii2的框架里,要用到php-cli进程,需要将Controller放入commands文件夹中,并且使用php ./yii ...的方式进行调用。
首先创建一个Controller用于启动异步队列,代码如下:
<?php namespace app\commands\daemon; use yii\console\Controller; use Yii; class DaemonController extends Controller { private $running_children_pid = []; private $running = true; private static $_master_pid; public static $pid_file; private $alarm_times = 0; private $pid = 0; public function __construct($id, $module, $config = array()) { $this->checkPcntl(); static::$pid_file = __DIR__ . "/../../runtime/dasheng_yii_daemon.pid"; parent::__construct($id, $module, $config); } //检查环境是否支持pcntl支持 private function checkPcntl() { // Make sure PHP has support for pcntl if (!function_exists('pcntl_signal')) { $message = 'PHP does not appear to be compiled with the PCNTL extension. This is neccesary for daemonization'; echo $message; throw new \Exception($message); } } private function killallChildren() { foreach ($this->running_children_pid as $pname => $pids) { foreach ($pids as $pid) { posix_kill($pid, SIGQUIT); } } } private function sendWarningMail($msg) { try { Yii::$app->mailer->compose() ->setFrom('***') ->setTo('***') ->setSubject('daemon服务器报警') ->setHtmlBody("<html>$msg</html>") ->send(); } catch (\Exception $e) { \Yii::warning("【daemon服务器报警】发送预警邮件失败>>>" . $e->getMessage()); } } //信号处理函数 public function signalHandler($signo) { EchoFormat::out("" . $this->pid . " signalHandler($signo)"); switch ($signo) { //子进程结束信号,可能会有多个子进程结束,用同一个信号,所以while里也要wait case SIGCHLD: $stopped_pid = pcntl_wait($status, WNOHANG); EchoFormat::out($this->pid." wait pid=".$stopped_pid); $this->removePid($stopped_pid); //发送报警 if($this->running) { EchoFormat::out("running"); ++$this->alarm_times; switch ($this->alarm_times) { case 1: case 10: case 100: $ip_info = shell_exec('ifconfig'); $this->sendWarningMail("<pre>$ip_info</pre>"); break; default: break; } } break; //中断进程 case SIGTERM: case SIGHUP: case SIGQUIT: $this->running = false; EchoFormat::out("master stopping"); $this->killallChildren(); break; default: break; } EchoFormat::out($this->pid." signalHandler over"); } //删除pid private function removePid($stopped_pid) { foreach ($this->running_children_pid as $pname => $pids) { $index = array_search($stopped_pid, $pids); if ($index !== false && $index >= 0) { EchoFormat::out("unset running_children_pid[$pname][$index]"); unset($this->running_children_pid[$pname][$index]); EchoFormat::out("running_children_pid=" . print_r($this->running_children_pid, true)); if (count($this->running_children_pid[$pname]) == 0) { unset($this->running_children_pid[$pname]); } } } } private function onStart() { while ($this->running) { //echo "mgr on start\n"; $daemon_config = Yii::$app->params["daemon_config"]; $daemons = $daemon_config["daemons"]; $process_name = $daemon_config["process_name"]; foreach ($daemons as $pname => $pparams) { $controller = $pparams["controller"]; $init_params = $pparams["init_params"]; $count = intval($pparams["count"]); $key = $pparams["key"]; $action = $controller . "/run"; $running_count = 0; $pids = array(); if (isset($this->running_children_pid[$pname])) {//processes are running $pids = $this->running_children_pid[$pname]; $running_count = count($pids); } if ($running_count >= $count) {//reduce process, send signal for ($i = 0; $i < $running_count - $count; ++$i) { $stopping_pid = array_shift($pids); posix_kill($stopping_pid, SIGQUIT); EchoFormat::out("reduce daemon=" . $pname. ", pid=".$stopping_pid); } $this->running_children_pid[$pname] = $pids; } $params[0] = $init_params; $params[1] = $key; for ($i = 0; $i < $count - $running_count; ++$i) { $child_pid = pcntl_fork(); if ($child_pid < 0) { EchoFormat::out("fork error"); continue; } else if (0 == $child_pid) { //worker $this->pid = getmypid(); EchoFormat::out("fork=" . $pname . ", pid=".$this->pid); cli_set_process_title($process_name.": worker process: " . $pname); Yii::$app->runAction($action, $params); exit(0); } //mgr $this->running_children_pid[$pname][] = $child_pid; } } //EchoFormat::out("".$this->pid." pcntl_signal"); //信号处理 pcntl_signal(SIGTERM, array($this, "signalHandler"), false); pcntl_signal(SIGINT, array($this, "signalHandler"), false); pcntl_signal(SIGQUIT, array($this, "signalHandler"), false); pcntl_signal(SIGCHLD, array($this, "signalHandler"), false); self::saveMasterPid(); //EchoFormat::out("".$this->pid." pcntl_signal_dispatch begin"); pcntl_signal_dispatch(); //EchoFormat::out("".$this->pid." pcntl_signal_dispatch over"); sleep(1); } while (count($this->running_children_pid)) { pcntl_signal_dispatch(); $stopped_pid = pcntl_wait($status); echo "loop wait pid=".$stopped_pid,"\n"; $this->removePid($stopped_pid); //sleep(1); usleep(100); } } public function actionStart() { if (php_sapi_name() != "cli") { die("only run in command line mode\n"); } $daemon_config = Yii::$app->params["daemon_config"]; $process_name = $daemon_config["process_name"]; $master_pid = @file_get_contents(self::$pid_file); $master_is_alive = $master_pid && @posix_kill($master_pid, SIG_DFL); if ($master_is_alive) { echo $process_name.": already running\n"; exit; } set_time_limit(0); umask(0); //把文件掩码清0 $master_pid = pcntl_fork(); if ($master_pid < 0) { echo "fork master failed!\n"; } else if ($master_pid) { exit(0); } posix_setsid();//设置新会话组长,脱离终端 $this->pid = getmypid(); cli_set_process_title($process_name.": master process"); self::saveMasterPid(); fclose(STDOUT); fclose(STDERR); global $STDOUT, $STDERR; $log_dir = __DIR__ . "/../../runtime/logs/console/"; if(is_dir($log_dir) == false) { mkdir($log_dir, 0777, true); } chmod($log_dir, 0777); $STDOUT = fopen($log_dir."console_echo-" . date("Y-m-d") . ".log", "ab"); $STDERR = fopen($log_dir."console_err-" . date("Y-m-d") . ".log", "ab"); $this->onStart(); EchoFormat::out("master stopped"); } public function actionStop() { $daemon_config = Yii::$app->params["daemon_config"]; $process_name = $daemon_config["process_name"]; EchoFormat::out("daemon stop.."); //system("kill $(ps -ef|grep \"".$process_name.": master\"|awk '$0 !~/grep/ {print $2}' |tr -s '\n' ' ')"); $master_pid = @file_get_contents(self::$pid_file); posix_kill($master_pid, SIGTERM); } protected static function saveMasterPid() { self::$_master_pid = posix_getpid(); if (false === @file_put_contents(self::$pid_file, self::$_master_pid)) { throw new \Exception('can not save pid to ' . self::$pid_file); } } public function actionRestart() { self::actionStop(); sleep(1); $master_pid = @file_get_contents(self::$pid_file); while($master_is_alive = $master_pid && @posix_kill($master_pid, SIG_DFL)) { sleep(1); } self::actionStart(); } }可以看到在onStart函数里是循环,用于不停的检查子进程的状态,如果没有启动的情况下,会fork一个子进程,然后通过:
Yii::$app->runAction($action, $params);进入具体的daemon进程的controller,那么我们定义了一个抽象类来抽象这个controller的功能:
<?php namespace app\commands\daemon; use app\components\DbMethods; use app\components\RedisLink; use yii\console\Controller; abstract class DaemonBaseController extends Controller { private $running = true; //信号处理函数 public function signalHandler($signo) { echo "" . getmypid() . " signalHandler($signo)\n"; switch ($signo) { //中断进程 case SIGTERM: case SIGHUP: case SIGQUIT: $this->running = false; break; default: break; } } public function actionRun($init_params = [], $key = "") { pcntl_signal(SIGTERM, array($this, "signalHandler"), false); pcntl_signal(SIGINT, array($this, "signalHandler"), false); pcntl_signal(SIGQUIT, array($this, "signalHandler"), false); $redis_pipe = RedisLink::getRedis("pipe"); $has_data = false; while ($this->running) { while ($str_pop_data = $redis_pipe->rpop($key)) { //$STDOUT = fopen(__DIR__."/../../runtime/logs/console/console_echo.log.".date("Y-m-d", time()), "ab"); //$STDERR = fopen(__DIR__."/../../runtime/logs/console/console_err.log.".date("Y-m-d", time()), "ab"); $pop_data = json_decode($str_pop_data, true); @$id = intval($pop_data["id"]); @$data = $pop_data["data"]; EchoFormat::out("class=" . get_class($this) . " perform id=".$id." data=".print_r($data, true)); $this->perform($id, $data, $init_params); $has_data = true; } if($has_data) { DbMethods::closeAll(); } $has_data = false; usleep(100000); pcntl_signal_dispatch(); } } abstract protected function perform($id, $data, $init_params); }子进程就会进入actionRun函数,并且不停的通过$redis_pipe->rpop($key)从redis的队列里取数据,然后将数据传给继承类需要完成的abstract protected function perform($id, $data, $init_params);往redis的队列里放入数据的代码,可以放在components文件夹,让其他业务共享:
<?php namespace app\components; class AsynQueue { /** * @param string $key缓存redis键名 * @param int $id * @param array $data */ public static function pushData($key, $id, $data) { $redis_pipe = RedisLink::getRedis("pipe"); $push_data = array(); $push_data["id"] = $id; $push_data["data"] = $data; $redis_pipe->lpush($key, json_encode($push_data)); } }在config里面放置一个daemon_config.php的配置文件,例如:
<?php namespace app\config; return [ "process_name" => "test_daemon", "daemons" => [ "sms" => [ "controller" => "daemon/sms-daemon", "init_params" => [], "count" => 1, "key" => "sms_pipe", ], ] ];再写一个daemon controller的例子:
<?php namespace app\commands\daemon; use Yii; class UserDaemonController extends DaemonBaseController { protected function perform($id, $data, $init_params) { switch($id) { case 1: //调用某个服务 break; case 2: { //调用某个服务 } break; case 3: { //调用某个服务 } break; default: break; } } }