PHP之高性能I/O框架:Libevent(一)
Libevent 是一个用C语言编写的、轻量级的开源高性能I/O框架,支持多种 I/O 多路复用技术: epoll、 poll、 dev/poll、 select 和 kqueue 等;支持 I/O,定时器和信号等事件;注册事件优先级。PHP提供了对应的扩展 libevent、 Event 。
libevent扩展很久没有更新了,仅支持PHP5系列,PHP7虽然有网友fork了 libevent 扩展的源码进行更新兼容,但是稳定性不好,可能会出现段错误,所以PHP7最好使用 Event 扩展。
与libevent扩展不同的是,Event 扩展提供了面向对象的接口,且支持更多特性。
libevent扩展
libevent地址: http://pecl.php.net/package/l...
libevent文档: http://docs.php.net/libevent
系统需要先安装 Libevent 库:
yum install libevent-dev
然后安装PHP扩展。
PHP5安装:
pecl install libevent-0.1.0
PHP7安装(不稳定):
git clone https://github.com/expressif/pecl-event-libevent.git cd pecl-event-libevent phpize ./configure make && sudo make install
注:后面的代码示例均使用的php5.6
+ libevent-0.1.0
环境。
基本使用
下面的例子实现了一个单进程的TCP server,基于libevent实现I/O复用,达到高性能。
libevent_tcp_server.php
<?php /** * Created by PhpStorm. * User: 公众号: 飞鸿影的博客(fhyblog) * Date: 2018/6/23 */ $receive = []; $master = []; $buffers = []; $socket = stream_socket_server ("tcp://0.0.0.0:9201", $errno, $errstr); if (false === $socket ) { echo "$errstr($errno)\n"; exit(); } if (!$socket) die($errstr."--".$errno); stream_set_blocking($socket,0); $id = (int)$socket; $master[$id] = $socket; echo "waiting client...\n"; //accept事件回调函数,参数分别是$fd, $events, $arg。 //也就是 event_set 函数的$fd, $events, $arg参数。 function ev_accept($socket, $flag, $base){ global $receive; global $master; global $buffers; $connection = stream_socket_accept($socket); stream_set_blocking($connection, 0); $id = (int)$connection; echo "new Client $id\n"; $event = event_new(); event_set($event, $connection, EV_READ | EV_PERSIST, 'ev_read', $id); event_base_set($event, $base); event_add($event); $master[$id] = $connection; //如果去掉该行,客户端直接被断开 $receive[$id] = ''; //如果去掉该行,服务端无法正常收到消息 $buffers[$id] = $event; //如果去掉该行,客户端强制断开再连接,服务端无法正常收到消息 } //read事件回调函数 function ev_read($buffer, $flag, $id) { global $receive; global $master; global $buffers; //该方法里的$buffer和$master[$id]指向相同的内容 // var_dump(func_get_args(), $master[$id] ); //循环读取并解析客户端消息 while( 1 ) { $read = @fread($buffer, 1024); //客户端异常断开 if($read === '' || $read === false){ break; } $pos = strpos($read, "\n"); if($pos === false) { $receive[$id] .= $read; // echo "received:".$read.";not all package,continue recdiveing\n"; }else{ $receive[$id] .= trim(substr ($read,0,$pos+1)); $read = substr($read,$pos+1); switch ( $receive[$id] ){ case "quit": echo "client close conn\n"; // fclose($master[$id]); //断开客户端连接 // event_del($buffers[$id]); //删除事件 //下面的写法与上面调用函数效果一样,都是关闭客户端连接 unset($master[$id]); unset($buffers[$id]); break; default: // echo "all package:\n"; echo $receive[$id]."\n"; break; } $receive[$id]=''; } } } //创建全局event base $base = event_base_new(); //创建 event $event = event_new(); //设置 event:其中$events设置为EV_READ | EV_PERSIST ;回调事件为ev_accept,参数 $base //EV_PERSIST可以让注册的事件在执行完后不被删除,直到调用event_del()删除. event_set($event, $socket, EV_READ | EV_PERSIST, 'ev_accept', $base); // 全局event base添加 当前event event_base_set($event, $base); event_add($event); echo "start run...\n"; //进入事件循环 event_base_loop($base); //下面这句不会被执行 echo "This code will not be executed.\n";
我们先运行代码:
$ php libevent_tcp_server.php waiting client... start run...
客户端使用telnet:
$ telnet 127.0.0.1 9201 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'. hello server!
代码里面我加了很多注释,基本上能看明白。需要注意的是:
1、event_base
是全局的,只需要创建一次,后续都是event的设置和添加。
2、event_set
的回调函数有三个参数,分别是$fd
, $events
, $arg
。也就是 event_set 函数的$fd
, $events
, $arg
参数。arg 如果需要多个,可以为数组。fd参数实际是保存的客户端连接,是个resource。events参数支持下列这些常量:
EV_TIMEOUT
: 超时。利用事件可以实现定时器EV_READ
: 只要网络缓冲中还有数据,回调函数就会被触发EV_WRITE
: 只要塞给网络缓冲的数据被写完,回调函数就会被触发EV_SIGNAL
: POSIX信号量EV_PERSIST
: 不指定这个属性的话,回调函数被触发后事件会被删除EV_ET
: Edge-Trigger边缘触发
3、ev_accept
回调里面,后面几行如果不设置,会出现异常。目前没有找到好的解释。
4、EV_READ
回调里面,删除客户端连接使用 unset也可以达到同样效果,这个和第3点一样,没有找到好的解释。
使用event_buffer
libevent还提供了event_buffer_
系列函数。手册里的解释是:Libevent在基础的API里提供了一层抽象层,使用 buffered event ,我们无序手动处理I/O。估计是对性能的提升。
示例: libevent_buffer_tcp_server.php
<?php /** * Created by PhpStorm. * User: 公众号: 飞鸿影的博客(fhyblog) * Date: 2018/6/23 */ $receive = []; $master = []; $buffers = []; $socket = stream_socket_server ("tcp://0.0.0.0:9201", $errno, $errstr); if (false === $socket ) { echo "$errstr($errno)\n"; exit(); } if (!$socket) die($errstr."--".$errno); stream_set_blocking($socket,0); $id = (int)$socket; $master[$id] = $socket; echo "waiting client...\n"; function ev_accept($socket, $flag, $base){ global $receive; global $master; global $buffers; $connection = stream_socket_accept($socket); stream_set_blocking($connection, 0); $id = (int)$connection; echo "new Client $id\n"; //#1 下面改成了event_buffer事件,与event事件有些不同 //event_buffer_new额外支持写、错误事件 $buffer = event_buffer_new($connection, 'ev_read', 'ev_write', 'ev_error', $id); event_buffer_base_set($buffer, $base); //指定超时时间,单位秒 event_buffer_timeout_set($buffer, 30, 30); //设置水位,参考:https://www.cnblogs.com/nengm1988/p/8203784.html event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff); //设置优先级 event_buffer_priority_set($buffer, 10); //开启event_buffer event_buffer_enable($buffer, EV_READ | EV_PERSIST); $master[$id] = $connection; $receive[$id] = ''; $buffers[$id] = $buffer; } //#2 read回调,由于使用了event_buffer,这里仅接受2个参数,分别是fd和arg function ev_read($buffer, $id) { // var_dump(func_get_args()); global $receive; global $master; global $buffers; while( 1 ) { //#3 使用event_buffer_read,而不是fread $read = @event_buffer_read($buffer, 1024); if($read === '' || $read === false) { break; } $pos = strpos($read, "\n"); if($pos === false) { $receive[$id] .= $read; echo "received:".$read.";not all package,continue recdiveing\n"; }else{ $receive[$id] .= trim(substr ($read,0,$pos+1)); $read = substr($read,$pos+1); switch ( $receive[$id] ){ case "quit": echo "client close conn\n"; unset($master[$id]); unset($buffers[$id]); // fclose($master[$id]); // event_buffer_free($buffers[$id]); break; default: echo "all package:\n"; echo $receive[$id]."\n"; break; } $receive[$id]=''; } } } function ev_write($buffer, $id) { echo "$id -- " ."\n"; } function ev_error($buffer, $error, $id) { echo "ev_error - ".$error."\n"; } $base = event_base_new(); $event = event_new(); event_set($event, $socket, EV_READ | EV_PERSIST, 'ev_accept', $base); event_base_set($event, $base); event_add($event); echo "start run...\n"; event_base_loop($base);
注释我都写了,相比前一个例字,主要有3个地方不同:
1、ev_accept
里设置read事件全换成了待buffer的函数;
2、EV_READ
回调接收参数为2个;
3、EV_READ
回调里读取消息使用 event_buffer_read
,而不是fread。另外增加了EV_WRITE
,ev_error
回调。
定时器
libevent提供了event_timer_*
系列函数,实现一次性定时器,精度微秒。
libevent_timer.php
<?php /** * Created by PhpStorm. * User: 公众号: 飞鸿影的博客(fhyblog) * Date: 2018/6/23 */ $TIME_INTVAL = 1000000; //单位微秒 //回调函数 function ev_timer($fd, $events, $args){ // var_dump(func_get_args()); //打印结果:参数fd为NULL,参数events固定为EV_TIMEOUT常量 static $c; $c++; echo time()." hello\n"; event_timer_add($args[1], $args[0]);//再次添加定时器 if($c > 5){ event_timer_del($args[1]); //删除定时器 } } $base = event_base_new(); $ev_timer = event_timer_new(); event_timer_set($ev_timer, 'ev_timer', [$TIME_INTVAL, $ev_timer]); event_base_set($ev_timer, $base); event_timer_add($ev_timer, $TIME_INTVAL);//单位微秒 event_base_loop($base);
上面的例子实现了每1秒执行一次回调函数。
使用event_*
系列函数也可以实现: libevent_timer2.php
<?php /** * Created by PhpStorm. * User: 公众号: 飞鸿影的博客(fhyblog) * Date: 2018/6/23 */ $TIME_INTVAL = 1000000; function ev_timer($fd, $events, $args){ static $c; $c++; echo time()." hello\n"; event_timer_add($args[1], $args[0]); if($c > 5){ event_timer_del($args[1]); } } $base = event_base_new(); $event = event_new(); event_set($event, 0, EV_TIMEOUT, 'ev_timer', [$TIME_INTVAL, $event]); event_base_set($event, $base); event_add($event, $TIME_INTVAL); event_base_loop($base);
可以看出,event_timer_*
系列函数是对event_*
系列函数EV_TIMEOUT
事件的包装。
总结
event_*
系列函数基本上可以分为上面三大类。还有几个函数没有提到,大家看手册就能了解。
(未完待续)
欢迎关注公众号及时获取最新文章推送!
推荐!每月仅需$2.5,即可拥有配置SSD的VPS!