centos(php7)下gearman实现异步处理队列任务

安装gearman需要的环境

wget https://github.com/libevent/libevent/releases/download/release-2.1.8-stable/libevent-2.1.8-stable.tar.gz
tar zxvf libevent-2.1.8-stable.tar.gz
cd libevent-2.1.8-stable
./configure --prefix=/usr
make
make install

yum install libevent-devel -y
yum install boost boost-devel -y
yum install gperf* -y
yum install libuuid-devel -y

安装gearman

wget https://github.com/gearman/gearmand/releases/download/1.1.17/gearmand-1.1.17.tar.gz
tar zxvf gearmand-1.1.17.tar.gz
cd gearmand-1.1.17
./configure --prefix=/usr/local/gearman
make
make install
/sbin/ldconfig

gearman自动

mikdir /usr/local/gearman/var/log
gearmand -d -l /usr/local/gearman/var/log/gearman.log

gearmand -V
# 输出:gearmand 1.1.17 - https://github.com/gearman/gearmand/issues

安装php扩展gearman

wget http://pecl.php.net/get/gearman-1.1.2.tgz
#如果是php7:wget https://github.com/wcgallego/pecl-gearman/archive/master.zip
tar zxvf gearman-1.1.2.tgz
cd gearman-1.1.2
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config
make
make install

修改php.ini

extension = gearman.so
#重启php-fpm 或者 重启apache

程序实现

简单的的字符反转案例(来自官方翻译,稍作修改)

client.php 任务发送方

<?php

$client = new GearmanClient();

// 添加服务器
// php官方文档:http://php.net/manual/en/gearmanclient.addserver.php
// $client->addServer("10.0.0.2", 7003);
$client->addServer();

echo "发送任务: \n";

// 发送 {"msg":"test"}, workload消息给 task_func_name 任务
$data = '{"msg":"' . $argv[1] . '"}';
$result = $client->doNormal("task_func_name", $data);      //普通任务,阻塞
#$result = $client->doBackground("task_func_name", $data); //后台任务,直接返回任务句柄
var_dump($result);

?>

work.php 任务接收消费方

<?php

$worker = new GearmanWorker();

$worker->addServer() or die("无法链接服务器");

// 通知服务器,$worker绑定task_func_name处理函数work_func。
$worker->addFunction("task_func_name", "work_func");

while (1) {
    print "接收任务...\n";
    $ret = $worker->work(); // reverse_fn 任务执行完成
    if ($ret != true) { //代码和下面一样
        break;
    }
    //获取最后一次执行任务的返回码 执行成功正常返回0
    if ($worker->returnCode() != GEARMAN_SUCCESS) {
        break;
    }
}

// A much simple reverse function
function work_func(GearmanJob $job) {
    $workload = $job->workload(); //取得发送方发送的消息
    echo "Received job: " . $job->handle() . "\n";  //获取任务的句柄,有服务器自动分配,对用户不透明
    echo "Workload: $workload\n";
    $result = json_decode($workload, true);
    $result['msg'] = strrev($result['msg']);
    $json = json_encode($result);
    echo "Result: $json\n";
    return $json;
}

?>

调用方式:

#启动接收消费方
php work.php
#output: 接收任务...

//启动发送消息方
php client.php testmsg1
# output: 
# 发送任务:
# string(18) "{"msg":"1gsmtset"}"
php client.php testmsg2
# output: 
# 发送任务:
# string(18) "{"msg":"2gsmtset"}"

修改client里改成后台
php client.php testmsg1
# output: 
# string(26) "H:localhost.localdomain:59" //这里不再返回的结果了

总结

不管是阻塞还是异步直接返回,消费者这边都是根据入队列的顺序依次执行,即:一个执行完再接一个,如果一个任务消费很长时间,后续任务都会在等待前个处理完才可以继续执行

多任务案列(来自官方,小刀改)

client.php

<?php
$client = new GearmanClient();
$client->addServer();

// 初始化三个变量
$userInfo = $friends = $posts = null;

//消费端执行完成任务这里会被异步调用
$client->setCompleteCallback(function(GearmanTask $task, $context) use (&$userInfo, &$friends, &$posts) {
    switch($context) {
        case 'content_task1':
            $userInfo = $task->data();
            $aa = $task->data();
            break;
        case 'content_task2':
            $friends = $task->data();
            $aa = $task->data();
            break;
        case 'content_task3':
            $posts = $task->data();
            $aa = $task->data();
            break;
    }
});

// 添加任务
$client->addTask('task1', 'qkl', 'content_task1');
$client->addTask('task2', 'qkl', 'content_task2');
$client->addTask('task3', 'qkl', 'content_task3');

echo "读取中...\n";
$start = microtime(true);
$client->runTasks();
$totaltime = number_format(microtime(true) - $start, 2);

echo "得到结果消耗的时间: $totaltime 秒:\n";
var_dump($userInfo, $friends, $posts);
?>

work.php

<?php

$worker = new GearmanWorker();
$worker->addServer();

//这里注意,多任务的执行顺序是根据消费者这边注册消息类型的顺序依次来执行,且要等上一次执行完成才可以继续
//现在注册顺序是task1 task2 task3 执行顺序就是task1 task2 task3
//现在注册顺序是task3 task2 task1 执行顺序就是task3 task2 task1
$worker->addFunction('task1', function(GearmanJob $job){
    sleep(2);
    return 'task1: '. $job->workload();
});

$worker->addFunction('task2', function(GearmanJob $job){
    return 'task2: '. $job->workload();
});

$worker->addFunction('task3', function(GearmanJob $job){
    return 'task3: '. $job->workload();
});

while (true) {
    $ret = $worker->work();
    var_dump($ret);
};

?>

总结

这里多任务场景目前没有特别好的场景构想,理论那些一步步同步执行顺序的流程的业务,只是gearman可以client添加任务后直接执行后续任务,业务交给消费端慢慢执行

相关推荐