PHP RabbitMQ 使用记录

中文介绍 :http://baike.baidu.com/view/4095865.htm?fr=aladdin

官方地址:http://www.rabbitmq.com/

PHP扩展包:http://pecl.php.net/package/amqp

1. 安装RabbitMQ  (unbuntu)
sudo apt-get install rabbitmq-server 
sudo /etc/init.d/rabbitmq-server start

2. 安装librabbitmq  (unbuntu)
sudo apt-get install mercurial 
hg clone http://hg.rabbitmq.com/rabbitmq-c 
cd rabbitmq-c 
hg clone http://hg.rabbitmq.com/rabbitmq-codegen codegen 
autoreconf -i && ./configure && make && sudo make install 
 
3. 安装php-rabbit扩展  (unbuntu)
wget http://php-rabbit.googlecode.com/files/php-rabbit.r91.tar.gz 
tar -zxvf php-rabbit.r91.tar.gz 
cd php-rabbit.r91 
/path/to/php/bin/phpize 
./configure –with-amqp –with-php-config=/path/to/php/bin/php-config 
make && sudo make install 
编辑 php.ini 添加: 
extension=rabbit.so 
输出phpinfo看下是否扩展已经加载成功 (window apache下安装要注意同时在apache 配置文件中加载LoadFile "D:/xampp/php/rabbitmq.1.dll")

4. 代码实例

发送消息:

<?php
error_reporting(E_ERROR);
set_time_limit(0);

$exchangeName = 'demo';
$queueName = 'hello';
$routeKey = 'hello';
$message = 'Hello World!';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");

try {
    $channel = new AMQPChannel($connection);
    $exchange = new AMQPExchange($channel);
    $exchange->setName($exchangeName);
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    $i = 0;
    while ($i < 100) {
        $message = $i .':'. date("Y-m-d H:i:s") ;
        $exchange->publish($message, $routeKey);
        $i +=1;
    }
    var_dump("[x] Sent 'Hello World!'");
} catch (AMQPConnectionException $e) {
    var_dump($e);
    exit();
}
$connection->disconnect(); 
?>

处理消息队列

<?php
error_reporting(E_ERROR);
set_time_limit(0);
$exchangeName = 'demo';
$queueName = 'hello';
$routeKey = 'hello';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->declare();
$queue->bind($exchangeName, $routeKey);

var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {
        $queue->consume('callback');
}
$connection->disconnect();

function callback($envelope, $queue) {
        $msg = $envelope->getBody();
        var_dump(" Received:" . $msg);
        $queue->nack($envelope->getDeliveryTag());
}

相关推荐