Opensearch PHP SDK协程兼容改造

摘要

本文简单的介绍了协程的概念及基本原理,以及协程在PHP中的一种实现方案(PECL/Swoole)。最后,结合Opensearch PHP SDK的协程改造过程演示了具体的使用方法。

协程

与进程、线程一样,协程是逻辑代码线之间隔离的一种方法。只不过进程和线程是由操作系统直接支持,并负责调度的;协程的粒度比线程更小,操作系统无法感知,因此调度工作必须由程序自己完成。

从目标上来看,协程与epoll等模型基本一致:都是为了降低进程(线程)调度引发的频繁上下文切换的资源消耗,最终提高系统效率。使用epoll模型编写的代码大量使用回调函数(类似下面的伪代码):

connect(uri, connected() {
    send(data, sent() {
        receive(received(response) {
            // ...
        });
    });
})

在实际编写中,一般不会使用这么深层次的函数嵌套结构,但是上例从侧面描述了异步代码的编写困境:效率高,阅读难。

与epoll模型不同,协程代码不需要编写很多回调函数,代码逻辑看起来和同步代码一样:

connect(uri);
send(data);
response = receive();
// ...

协程调度器完成了其中的调度工作:感知挂起,完成调度。

协程的概念提出的很早,只是最近有些编程语言原生支持协程(如:Go)才使得其变得较为热门。PHP解释器对各种C类库的依赖较为严重,代码中大量使用同步方法。因此直接在Zend Engine中支持协程困难重重。好在有扩展开发人员编写了大量的实现代码,为我们解决了这个问题。

PECL/Swoole

PECL/Swoole是使用C/C++开发的PHP异步网络通讯扩展,提供异步非阻塞网络通讯支持。基于PECL/Swoole扩展,我们可以在PHP非线程安全模式下实现多线程的网络通讯,提高PHP程序的吞吐能力。

自2.0开始,PECL/Swoole提供了原生的协程支持。开发者可以借助一整套新编写的类和方法实现单线程的基于协程的网络通讯。自4.0开始,PECL/Swoole重写了协程部分全部的代码,弃用了(未发布的3.0版本)基于微信C++协程库的对于协程的实现方案,自主实现了较为稳定的协程方案。

下面的代码展示了如何通过PECL/Swoole实现简单的HTTP客户端请求(与PECL/Swoole版本无关):

go(function() {
    $cli = new \Swoole\Coroutine\Http\Client('127.0.0.1', 9501);

    $cli->setHeaders(['Host' => 'localhost']);
    $cli->set(['http_proxy_host' => HTTP_PROXY_HOST, 'http_proxy_port' => HTTP_PROXY_PORT]);

    $result = $cli->get('/get?json=true');
    var_dump($cli->body);
});

代码中的匿名函数首先通过IP地址和端口号创建了HTTP客户端对象,然后分别设置了头信息和代理信息,最后通过GET方法获取URI的响应结果并输出。

示例代码中的go()函数是PECL/Swoole协程实现的核心:在其中执行的代码全部受到协程调度器的管控,并在某个协程操作挂起时自动切换到其他协程待处理的代码段中。下面的伪代码展示了如何借助go()函数同时发出多个请求:

for ($i=0; $i<10; ++$i) {
    go(function() use($i) {
        $response = request('/region');
        echo "#{$i}: " . $response . PHP_EOL;
    });
}

由于协程调度器的存在,代码不会在request()函数处停留,全部请求几乎同时发出。这就意味着获得响应的顺序也不会严格按照#0, #1, …的顺序进行:哪个请求先返回,哪个请求的的echo语句先被执行。

当然,PECL/Swoole目前只支持其自制的、经过改造的网络通讯类,其他尚未改造的阻塞函数(或方法)无法被支持。

改造手记

与大部分的PHP编写的HTTP客户端程序一样,Opensearch PHP SDK使用cURL作为默认的HTTP请求工具。借助ext/curl,我们可以实现绝大多数的阻塞式的HTTP请求(包括HTTPS请求)。但是对于协程程序来说,这里就是需要重点改造的地方。

1.改造原有代码

OpenSearch\Client\OpenSearchClient类中,我们找到了前辈们提取出的公用请求方法_curl()

private function _curl($url, $items) {
        $method = strtoupper($items['method']);
        $options = array(
            CURLOPT_HTTP_VERSION => 'CURL_HTTP_VERSION_1_1',
            CURLOPT_CONNECTTIMEOUT => $this->connectTimeout,
            CURLOPT_TIMEOUT => $this->timeout,
            CURLOPT_CUSTOMREQUEST => $method,
            CURLOPT_HEADER => false,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERAGENT => "opensearch/php sdk " . self::SDK_VERSION . "/" . PHP_VERSION,
            CURLOPT_HTTPHEADER => $this->_getHeaders($items),
        );

        if ($method == self::METHOD_GET) {
            $query = $this->_buildQuery($items['query_params']);
            $url .= preg_match('/\?/i', $url) ? '&' . $query : '?' . $query;
        } else{
            if(!empty($items['body_json'])){
                $options[CURLOPT_POSTFIELDS] = $items['body_json'];
            }
        }

        if ($this->gzip) {
            $options[CURLOPT_ENCODING] = 'gzip';
        }

        if ($this->debug) {
            $out = fopen('php://temp','rw');
            $options[CURLOPT_VERBOSE] = true;
            $options[CURLOPT_STDERR] = $out;
        }

        $session = curl_init($url);
        curl_setopt_array($session, $options);
        $response = curl_exec($session);
        curl_close($session);

        $openSearchResult = new OpenSearchResult();
        $openSearchResult->result = $response;

        if ($this->debug) {
            $openSearchResult->traceInfo = $this->getDebugInfo($out, $items);
        }

        return $openSearchResult;
    }

上述代码的大致流程是:

  • 设置cURL请求参数;
  • 请求并获取响应体;
  • 构建并返回OpenSearch\Generated\Common\OpenSearchResult对象;

首先,我们需要提供一个可供用户切换的开关,便于协程开发者从cURL模式切换为Swoole模式:

/** @var IHttpHandler */
    private $httpHandler = null;

    public function __construct($accessKey, $secret, $host, $options = array()) {
        // ...
        $this->httpHandler = new CUrlHttpHandler();
        // ...
    }

    public function setHttpHandler(IHttpHandler $httpHandler)
    {
        $this->httpHandler = $httpHandler;
    }

其次,定义IHttpHandler接口:

interface IHttpHandler
{
    /**
     * Performs a HTTP request and returns response body
     *
     * @return string|false
     */
    public function request($url, $items, $connectTimeout, $timeout, $gzip, $debug);
}

接口方法request()的参数和返回值保持与原_curl()方法一致,但是追加了一些原来可以通过$this->获取到的配置参数。

注:如果深入改造的话,可以考虑将这些$this->参数移入IHttpHandler的抽象实现中。

使用该接口改造原_curl()方法:

private function _curl($url, $items) {
        $response = $this->httpHandler->request($url, $items
                , $this->connectTimeout, $this->timeout, $this->gzip, $this->debug);
        // ...
    }

由于原_curl()方法中包含对OpenSearchClient类私有方法的调用,考虑建立IHttpHandler的抽象实现共享这部分方法:

abstract class AbstractHttpHandler implements IHttpHandler
{
    // Extract from OpenSearchClient
    public function _getHeaders($items) {
        // ...
    }

    // Extract from OpenSearchClient
    public function _buildQuery($params) {
        // ...
    }
}

在改造原_curl()方法时,原有的代码就可以拼接出CUrlHttpHandler

class CUrlHttpHandler extends AbstractHttpHandler
{
    public function request($url, $items, $connectTimeout, $timeout, $gzip, $debug)
    {
        $method = strtoupper($items['method']);
        $options = array(
            CURLOPT_HTTP_VERSION => 'CURL_HTTP_VERSION_1_1',
            CURLOPT_CONNECTTIMEOUT => $connectTimeout,
            CURLOPT_TIMEOUT => $timeout,
            CURLOPT_CUSTOMREQUEST => $method,
            CURLOPT_HEADER => false,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERAGENT => "opensearch/php sdk " . OpenSearchClient::SDK_VERSION . "/" . PHP_VERSION,
            CURLOPT_HTTPHEADER => $this->_getHeaders($items),
        );

        if ($method == OpenSearchClient::METHOD_GET) {
            $query = $this->_buildQuery($items['query_params']);
            $url .= preg_match('/\?/i', $url) ? '&' . $query : '?' . $query;
        } else{
            if(!empty($items['body_json'])){
                $options[CURLOPT_POSTFIELDS] = $items['body_json'];
            }
        }

        if ($gzip) {
            $options[CURLOPT_ENCODING] = 'gzip';
        }

        if ($debug) {
            $out = fopen('php://temp','rw');
            $options[CURLOPT_VERBOSE] = true;
            $options[CURLOPT_STDERR] = $out;
        }

        $session = curl_init($url);
        curl_setopt_array($session, $options);
        $response = curl_exec($session);
        curl_close($session);

        return $response;
    }
}

只是需要有两点修改:

  • 原有的$this->对属性的使用全部变更为局部变量,如:$this->debug更换为$debug
  • 原有的self::对常量的使用全部变更为OpenSearchClient::

最后,就是我们本次的重头戏SwooleHttpHandler了。

2.新的方法

PECL/Swoole的更新迭代速度飞快,因此其文档远远追不上最新的版本。很多时候,我们只能够靠分析其源代码探寻可以使用属性或者方法。

首先,建立请求类对象:

$host = parse_url($url, PHP_URL_HOST);
        $client = new \Swoole\Coroutine\Http\Client($host);

然后,对应cURL配置各种参数:

// ...

        // 跳过CURLOPT_HTTP_VERSION(Swoole默认使用HTTP/1.1)
        // 跳过CURLOPT_CONNECTTIMEOUT(注意:暂无法设置连接超时时间)
        // CURLOPT_TIMEOUT
        $client->set(['timeout' => $timeout]);
        // CURLOPT_CUSTOMREQUEST
        $client->setMethod($method);
        // 跳过CURLOPT_HEADER(Swoole默认将响应头、体分离)
        // 跳过CURLOPT_RETURNTRANSFER(Swoole默认返回响应体)
        // CURLOPT_USERAGENT
        $headers['User-Agent'] = "opensearch/php sdk " . OpenSearchClient::SDK_VERSION . "/" . PHP_VERSION;
        // CURLOPT_ENCODING
        if ($gzip) {
            $headers['Accept-Encoding'] = 'gzip';
        }
        // CURLOPT_HTTPHEADER
        $client->setHeaders($headers); // NAME => VALUE

接下来,根据请求类型存放请求体:

if ($method == OpenSearchClient::METHOD_GET) {
            $query = $this->_buildQuery($items['query_params']);
            $url .= preg_match('/\?/i', $url) ? '&' . $query : '?' . $query;
        } else {
            if(!empty($items['body_json'])){
                $client->setData($items['body_json']); // Request body
            }
        }

最后,请求并返回结果:

$result = $client->execute($url); // Boolean
        if (!$result) {
            return false;
        }

        return $client->body;

至此,改造完毕。

3.测试使用

注:下面的代码只是展示了改造后的客户端类如何使用,并不涉及多请求的并行演示:

go(function() {

    $coClient = OpensearchClientBuilder::build();
    $coClient->setHttpHandler(new OpenSearch\Client\SwooleHttpHandler()); // 更换请求处理器
    $coClient = new OpensearchClientResponseParser($coClient);

    $result = $coClient->get('/region');
    fprintf(STDOUT, "name=%s" . PHP_EOL, $result['result']['name']);

});

后记

虽然在Opensearch PHP SDK中支持协程并非用户提出的需求,但是作为一家技术型公司,为用户提供更多的技术选择可能性也是我们应该提倡、做到的。

本文中提到的PHP协程并非只有PECL/Swoole一种解决方案,PHP开发组也在考虑将协程内置的可能性。然而从功能完整性(即使存在上文中提到无法设置“连接超时时间”等问题)和稳定性上来看,PECL/Swoole无疑是当下最出色的。

参考文档



本文作者:timandes

阅读原文

本文为云栖社区原创内容,未经允许不得转载。

相关推荐