PHP RabbitMQ 使用记录
中文介绍 :http://baike.baidu.com/view/4095865.htm?fr=aladdin
官方地址:http://www.rabbitmq.com/
PHP扩展包:http://pecl.php.net/package/amqp
1. 安装RabbitMQ (unbuntu)
sudo apt-get install rabbitmq-server
sudo /etc/init.d/rabbitmq-server start
2. 安装librabbitmq (unbuntu)
sudo apt-get install mercurial
hg clone http://hg.rabbitmq.com/rabbitmq-c
cd rabbitmq-c
hg clone http://hg.rabbitmq.com/rabbitmq-codegen codegen
autoreconf -i && ./configure && make && sudo make install
3. 安装php-rabbit扩展 (unbuntu)
wget http://php-rabbit.googlecode.com/files/php-rabbit.r91.tar.gz
tar -zxvf php-rabbit.r91.tar.gz
cd php-rabbit.r91
/path/to/php/bin/phpize
./configure –with-amqp –with-php-config=/path/to/php/bin/php-config
make && sudo make install
编辑 php.ini 添加:
extension=rabbit.so
输出phpinfo看下是否扩展已经加载成功 (window apache下安装要注意同时在apache 配置文件中加载LoadFile "D:/xampp/php/rabbitmq.1.dll")
4. 代码实例
发送消息:
<?php error_reporting(E_ERROR); set_time_limit(0); $exchangeName = 'demo'; $queueName = 'hello'; $routeKey = 'hello'; $message = 'Hello World!'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); try { $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $queue = new AMQPQueue($channel); $queue->setName($queueName); $i = 0; while ($i < 100) { $message = $i .':'. date("Y-m-d H:i:s") ; $exchange->publish($message, $routeKey); $i +=1; } var_dump("[x] Sent 'Hello World!'"); } catch (AMQPConnectionException $e) { var_dump($e); exit(); } $connection->disconnect(); ?>
处理消息队列
<?php error_reporting(E_ERROR); set_time_limit(0); $exchangeName = 'demo'; $queueName = 'hello'; $routeKey = 'hello'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->declare(); $queue->bind($exchangeName, $routeKey); var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); } $connection->disconnect(); function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" Received:" . $msg); $queue->nack($envelope->getDeliveryTag()); }