swoole_process实现多进程

简介

swoole_process 是swoole提供的进程管理模块,用来替代PHP的pcntl扩展。

首先,确保安装的swoole版本大于1.7.2:

$ php --ri swoole

swoole

swoole support => enabled
Version => 1.10.1

实例说明

本例里待消费的是三个shell命令,会分别创建一个子进程来消费。消费的时候故意sleep了1秒,以便直观看到效果。

process_t1.php

<?php

$start_time = microtime(TRUE);

$cmds = [
    "uname",
    "date",
    "whoami"
];

foreach ($cmds as $cmd) {
    $process = new swoole_process( "my_process", true);

    $process->start();
    $process->write($cmd); //通过管道发数据到子进程。管道是单向的:发出的数据必须由另一端读取。不能读取自己发出去的

    echo $rec = $process->read();//同步阻塞读取管道数据
}

//子进程创建成功后要执行的函数
function my_process(swoole_process $worker){
    sleep(1);//暂停1s

    $cmd = $worker->read();

    // $return = exec($cmd);//exec只会输出命令执行结果的最后一行内容,且需要显式打印输出

    ob_start();
    passthru($cmd);//执行外部程序并且显示未经处理的、原始输出,会直接打印输出。
    $return = ob_get_clean();
    if(!$return) $return = 'null';
    
    $worker->write($return);//写入数据到管道
}

//子进程结束必须要执行wait进行回收,否则子进程会变成僵尸进程
while($ret = swoole_process::wait()){// $ret 是个数组 code是进程退出状态码,
    $pid = $ret['pid'];
    echo PHP_EOL."Worker Exit, PID=" . $pid . PHP_EOL;
}

$end_time = microtime(TRUE);
echo sprintf("use time:%.3f s\n", $end_time - $start_time);

命令行里运行:

$ php process_t1.php  

Linux
Sat Apr 21 15:29:55 CST 2018
root

Worker Exit, PID=672

Worker Exit, PID=674

Worker Exit, PID=676
use time:3.080 s

大家会觉得很奇怪,为什么开了三个子进程,还是用了3秒,应该是1秒左右才对呀。

原因是父进程读取子进程返回的数据的时候,是同步阻塞读取:

echo $rec = $process->read();//同步阻塞读取管道数据

导致的后果就是父进程依次等待每个进程处理完并返回了内容,才走下一次循环。

解决方案1:
使用swoole_event_add将管道加入到事件循环中,变为异步模式:

// echo $rec = $process->read();//同步阻塞读取管道数据

//使用swoole_event_add将管道加入到事件循环中,变为异步模式
swoole_event_add($process->pipe, function($pipe) use($process) {
    echo $rec = $process->read();
    
    swoole_event_del($process->pipe);//socket处理完成后,从epoll事件中移除管道
});

执行结果:

Worker Exit, PID=686

Worker Exit, PID=687

Worker Exit, PID=688
use time:1.060 s
Linux
Sat Apr 21 15:37:14 CST 2018
root

大家会发现,use time数据并不是最后打印出来的。已经是异步的了。 实际执行时间1s左右。

解决方案2:
先不获取子进程返回值,循环结束后统一返回:

foreach ($cmds as $cmd) {
    $process = new swoole_process( "my_process", true);

    $process->start();
    $process->write($cmd); //通过管道发数据到子进程

    $process_arr[] = $process;
}

foreach ($process_arr as $process){
    echo $rec = $process->read();
}

执行结果:

Linux
Sat Apr 21 15:52:24 CST 2018
root

Worker Exit, PID=694

Worker Exit, PID=693

Worker Exit, PID=695
use time:1.061 s

函数原型

swoole_process::__construct(callable $function, $redirect_stdin_stdout = false, $create_pipe = true);

第一个参数是子进程创建成功后要执行的函数。
$redirect_stdin_stdout,重定向子进程的标准输入和输出。启用此选项后,在子进程内输出内容将不是打印屏幕,而是写入到主进程管道(例如用echo打印的内容也写入管道)。读取键盘输入将变为从管道中读取数据。默认为阻塞读取。
$create_pipe,是否创建管道,启用$redirect_stdin_stdout后,此选项将忽略用户参数,强制为true。如果子进程内没有进程间通信,可以设置为 false

注意:swoole_process在最新的1.8.0版本已经禁止在Web环境中使用了,所以也只能支持命令行。这时候如果要做并发,multi-curl是不错的选择。

进程常驻后台

如果跑的服务需要一直常驻后台,可以在$process->start();前面加上:

swoole_process::daemon();

服务会在后台运行。

更多示例

多进程获取网页状态码

<?php

//获取多个网页信息

$urls = [
    'https://www.baidu.com',
    'http://www.52fhy.com',
    'http://www.52fhy.com/1',
    'https://www.52fhy.com',
];

foreach ($urls as $key => $url) {
    $process = new swoole_process(function(swoole_process $worker) use ($url){
        $code = getHttpCode($url);
        $worker->write($code);
    }, true);
    $process->start();

    swoole_event_add($process->pipe, function($pipe) use($process, $url) {
        echo sprintf("%s code: %s\n", $url, $process->read());
        swoole_event_del($pipe);
    });
}

echo "ok.\n";

while($ret = swoole_process::wait()){
    // echo PHP_EOL."Worker Exit, PID=" . $ret['pid'] . PHP_EOL;
}

/**
 * 获取网页http code
 */
function getHttpCode($url){
    $ch = curl_init();
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($ch, CURLOPT_HEADER, 0);
//        curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "HEAD");
    curl_setopt($ch, CURLOPT_NOBODY, true);
    curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); //不验证证书
    curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); //不验证证书
    curl_setopt ($ch, CURLOPT_TIMEOUT_MS, 1000);//超时时间
    curl_exec($ch);
    $info = curl_getinfo($ch);
    curl_close($ch);

    return (string)$info['http_code'];
}

运行:

$ php process_get.php
ok.
http://www.52fhy.com code: 403
http://www.52fhy.com/1 code: 404
https://www.baidu.com code: 200
https://www.52fhy.com code: 403

使用消息队列通信

<?php

//获取多个网页信息

$urls = [
    'https://www.baidu.com',
    'http://www.52fhy.com',
    'http://www.52fhy.com/1',
    'https://www.52fhy.com',
];

$process = new swoole_process(function(swoole_process $worker) use($urls) {
    foreach ($urls as $url) {
        $code = getHttpCode($url);
        $worker->push($url.': '.$code);
    }
    $worker->push('exit');
}, false, false); //不创建管道
$process->useQueue(1, 2); //使用消息队列。消息队列通信方式与管道不可共用。消息队列不支持EventLoop,使用消息队列后只能使用同步阻塞模式非阻塞
$process->start();

while(1){
    $ret = $process->pop();
    if($ret == 'exit') break;
    echo sprintf("%s\n", $ret);
}

echo "ok.\n";

while($ret = swoole_process::wait()){
    echo PHP_EOL."Worker Exit, PID=" . $ret['pid'] . PHP_EOL;
}

/**
 * 获取网页http code
 */
function getHttpCode($url){
   //省略
}

运行:

$ php process_get_queue.php
https://www.baidu.com: 200
http://www.52fhy.com: 403
http://www.52fhy.com/1: 404
https://www.52fhy.com: 403
ok.

Worker Exit, PID=1222

参考

1、Process
https://wiki.swoole.com/wiki/...
2、swoole_process->read
https://wiki.swoole.com/wiki/...

相关推荐