通过GatewayWorker/Workerman搭建Websocket微服务
背景
最近在一些项目需要用到Websocket
实时推送给分组的用户,前端需要传输给后端的信息比较少,通过多方考虑选择了通过GatewayWorker框架
(基于Workerman)搭建微服务。
介绍
Workerman
Workerman是一款纯PHP开发的开源高性能的PHP socket 服务框架。
Workerman不是重复造轮子,它不是一个MVC框架,而是一个更底层更通用的socket服务框架,你可以用它开发tcp代理、梯子代理、做游戏服务器、邮件服务器、ftp服务器、甚至开发一个php版本的redis、php版本的数据库、php版本的nginx、php版本的php-fpm等等。Workerman可以说是PHP领域的一次创新,让开发者彻底摆脱了PHP只能做WEB的束缚。
实际上Workerman类似一个PHP版本的nginx,核心也是多进程+Epoll+非阻塞IO。Workerman每个进程能维持上万并发连接。由于本身常住内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能。同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协以及各种自定义协议。拥有定时器、异步socket客户端、异步Mysql、异步Redis、异步Http、异步消息队列等众多高性能组件。
github地址:https://github.com/walkor/Workerman
文档:http://doc.workerman.net/315110
GatewayWorker
GatewayWorker基于Workerman开发的一个项目框架,用于快速开发TCP长连接应用,例如app推送服务端、即时IM服务端、游戏服务端、物联网、智能家居等等
GatewayWorker使用经典的Gateway和Worker进程模型。Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Gateway服务和BusinessWorker服务可以分开部署在不同的服务器上,实现分布式集群。
GatewayWorker提供非常方便的API,可以全局广播数据、可以向某个群体广播数据、也可以向某个特定客户端推送数据。配合Workerman的定时器,也可以定时推送数据。
github地址:https://github.com/walkor/GatewayWorker
文档:http://doc2.workerman.net/326102
Workerman与GatewayWorker的关系
Workerman可以看做是一个纯粹的socket类库,可以开发几乎所有的网络应用,不管是TCP的还是UDP的,长连接的还是短连接的。Workerman代码精简,功能强大,使用灵活,能够快速开发出各种网络应用。同时Workerman相比GatewayWorker也更底层,需要开发者有一定的多进程编程经验。
因为绝大多数开发者的目标是基于Workerman开发TCP长连接应用,而长连接应用服务端有很多共同之处,例如它们有相同的进程模型以及单发、群发、广播等接口需求。所以才有了GatewayWorker框架,GatewayWorker是基于Workerman开发的一个TCP长连接框架,实现了单发、群送、广播等长连接必用的接口。GatewayWorker框架实现了Gateway Worker进程模型,天然支持分布式多服务器部署,扩容缩容非常方便,能够应对海量并发连接。可以说GatewayWorker是基于Workerman实现的一个更完善的专门用于实现TCP长连接的项目框架。
GatewayClient
GatewayClient是GatewayWorker的客户端程序,可以进行推送、分组、统计等操作。
websocket微服务介绍
总体原则,websocket微服务不处理业务逻辑,仅仅是一个单向连接,只负责推送信息。但客户端连接websocket微服务时,websocket微服务返回给客户端clientId,客户端调用接口把clientId传给后端,此时后端就可以通过GatewayClient绑定用户到具体分组。但需要推送时,通过text协议与GatewayWorker通信,把要推送的clientId或者分组传给GatewayWorker,GatewayWorker再推送给客户端。图示如下:
具体实现
安装GatewayWorker内核
新建一个空白项目(不在Laravel/Lumen/ThinkPHP 等PHP框架里),执行
composer require workerman/gateway-worker
启动文件
在根目录新建start.php作为启动文件,代码:
<?php ini_set('display_errors', 'on'); use Workerman\Worker; if(strpos(strtolower(PHP_OS), 'win') === 0) { exit("start.php not support windows, please use start_for_win.bat\n"); } // 检查扩展 if(!extension_loaded('pcntl')) { exit("Please install pcntl extension. See http://doc3.workerman.net/appendices/install-extension.html\n"); } if(!extension_loaded('posix')) { exit("Please install posix extension. See http://doc3.workerman.net/appendices/install-extension.html\n"); } // 标记是全局启动 define('GLOBAL_START', 1); require_once __DIR__ . '/vendor/autoload.php'; // 加载所有Applications/*/start.php,以便启动所有服务 foreach(glob(__DIR__.'/src/start*.php') as $start_file) { require_once $start_file; } // 运行所有服务 Worker::runAll();
注册Register类
Register类负责注册内部通讯地址。Gateway进程和BusinessWorker进程启动后分别向Register进程注册自己的通讯地址,Gateway进程和BusinessWorker通过Register进程得到通讯地址后,就可以建立起连接并通讯了。
src/start_register.php (目录名可以自己定义) 代码:
<?php use \GatewayWorker\Register; // register 服务必须是text协议 $register = new Register('text://0.0.0.0:1238');
注册Gateway类
Gateway类用于初始化Gateway进程。Gateway进程是暴露给客户端的让其连接的进程。所有客户端的请求都是由Gateway接收然后分发给BusinessWorker处理,同样BusinessWorker也会将要发给客户端的响应通过Gateway转发出去。
src/start_gateway.php 代码:
<?php use \Workerman\Worker; use \Workerman\WebServer; use \GatewayWorker\Gateway; use \GatewayWorker\BusinessWorker; use \Workerman\Autoloader; // gateway 进程 $gateway = new Gateway("websocket://0.0.0.0:8282"); // gateway名称,status方便查看 $gateway->name = 'business-gateway'; // gateway进程数 $gateway->count = 2; // 本机ip,分布式部署时使用内网ip $gateway->lanIp = '127.0.0.1'; // 内部通讯起始端口,假如$gateway->count=4,起始端口为4000 // 则一般会使用4000 4001 4002 4003 4个端口作为内部通讯端口 $gateway->startPort = 2900; // 服务注册地址(register类地址) $gateway->registerAddress = '127.0.0.1:1238';
注册BusinessWorker类
BusinessWorker是运行业务逻辑的进程,BusinessWorker收到Gateway转发来的事件及请求时会默认调用Events.php中的onConnect onMessage onClose方法处理事件及数据,开发者正是通过实现这些回调控制业务及流程。
src/start_businessworker.php 代码:
<?php use \Workerman\Worker; use \Workerman\WebServer; use \GatewayWorker\Gateway; use \GatewayWorker\BusinessWorker; use \Workerman\Autoloader; // bussinessWorker 进程 $worker = new BusinessWorker(); // worker名称 $worker->name = 'Steam-BusinessWorker'; // bussinessWorker进程数量 $worker->count = 1; // 服务注册地址 $worker->registerAddress = '127.0.0.1:1238';
Events类
Events类用于捕获GatewayWorker事件,在这里可以写一些回调信息。
src/Events.php 代码:
<?php /** * 用于检测业务代码死循环或者长时间阻塞等问题 * 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload * 然后观察一段时间workerman.log看是否有process_timeout异常 */ //declare(ticks=1); use \GatewayWorker\Lib\Gateway; /** * 主逻辑 * 主要是处理 onConnect onMessage onClose 三个方法 * onConnect 和 onClose 如果不需要可以不用实现并删除 */ class Events { /** * 当客户端连接时触发 * 如果业务不需此回调可以删除onConnect * * @param int $client_id 连接id */ public static function onConnect($client_id) { // 向当前client_id发送数据 Gateway::sendToClient($client_id, json_encode([ 'clientId' => $client_id, ])); } /** * 当客户端发来消息时触发 * @param int $client_id 连接id * @param mixed $message 具体消息 */ public static function onMessage($client_id, $message) { } /** * 当用户断开连接时触发 * @param int $client_id 连接id */ public static function onClose($client_id) { } }
在PHP项目分组或推送给客户端
这是在你自己的项目写的代码,过程:前端调用接口传来clientId,后端绑定到分组,再推送信息给分组或者指定的clientId客户端。
需要在项目中引用GatewayClient包
composer require workerman/gatewayclient
代码:
// GatewayClient 3.0.0版本以后加了命名空间 use GatewayClient\Gateway; /** * === 指定registerAddress表明与哪个GatewayWorker(集群)通讯。=== * GatewayWorker里用Register服务来区分集群,即一个GatewayWorker(集群)只有一个Register服务, * GatewayClient要与之通讯必须知道这个Register服务地址才能通讯,这个地址格式为 ip:端口 , * 其中ip为Register服务运行的ip(如果GatewayWorker是单机部署则ip就是运行GatewayWorker的服务器ip), * 端口是对应ip的服务器上start_register.php文件中监听的端口,也就是GatewayWorker启动时看到的Register的端口。 * GatewayClient要想推送数据给客户端,必须知道客户端位于哪个GatewayWorker(集群), * 然后去连这个GatewayWorker(集群)Register服务的 ip:端口,才能与对应GatewayWorker(集群)通讯。 * 这个 ip:端口 在GatewayClient一侧使用 Gateway::$registerAddress 来指定。 * * === 如果GatewayClient和GatewayWorker不在同一台服务器需要以下步骤 === * 1、需要设置start_gateway.php中的lanIp为实际的本机内网ip(如不在一个局域网也可以设置成外网ip),设置完后要重启GatewayWorker * 2、GatewayClient这里的Gateway::$registerAddress的ip填写填写上面步骤1lanIp所指定的ip,端口 * 3、需要开启GatewayWorker所在服务器的防火墙,让以下端口可以被GatewayClient所在服务器访问, * 端口包括Rgister服务的端口以及start_gateway.php中lanIp与startPort指定的几个端口 * * === 如果GatewayClient和GatewayWorker在同一台服务器 === * GatewayClient和Register服务都在一台服务器上,ip填写127.0.0.1及即可,无需其它设置。 **/ Gateway::$registerAddress = '127.0.0.1:1236'; // GatewayClient支持GatewayWorker中的所有接口(Gateway::closeCurrentClient Gateway::sendToCurrentClient除外) Gateway::sendToAll($data); Gateway::sendToClient($client_id, $data); Gateway::closeClient($client_id); Gateway::isOnline($client_id); Gateway::bindUid($client_id, $uid); Gateway::isUidOnline($uid); Gateway::getClientIdByUid($client_id); Gateway::unbindUid($client_id, $uid); Gateway::sendToUid($uid, $dat); Gateway::joinGroup($client_id, $group); Gateway::sendToGroup($group, $data); Gateway::leaveGroup($client_id, $group); Gateway::getClientCountByGroup($group); Gateway::getClientSessionsByGroup($group); Gateway::getAllClientCount(); Gateway::getAllClientSessions(); Gateway::setSession($client_id, $session); Gateway::updateSession($client_id, $session); Gateway::getSession($client_id);