rabbitMQ使用
官方文档地址:http://www.rabbitmq.com/tutorials/tutorial-one-python.html
安装配置:http://www.ttlsa.com/linux/install-rabbitmq-on-linux/
1.hello world The simplest thing that doessomething
send.php
<?php //简单的发布消息 include_once __DIR__."/vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); //定义队列 $channel->queue_declare('hello', false, false, false, false); //定义普通消息 $msg = new AMQPMessage('Hello World!'); //发布消息 $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close();
client.php
<?php //简单的接收队列消息 include_once __DIR__."/vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); //定义消息 $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; //输入消息方法 $callback = function($msg) { echo " [x] Received ".$msg->body."\n"; }; //队列消耗 $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
new_task.php
<?php //队列分配发布 include_once __DIR__."/vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $data = implode(" ",array_splice($argv,1)); if(empty($data)) $data = 'hello world'; $msg = new AMQPMessage($data, array('delivery_mode'=>2)#定义分配模式 ); $channel->basic_publish($msg,'','task_queue'); echo " [x] Sent ", $data, "\n";
woker.php
<?php //队列分配接收 include_once __DIR__."/vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); //定义 $channel->queue_declare('task_queue', false, true, false, false); $data = implode(" ",array_splice($argv,1)); if(empty($data)) $data = 'hello world'; $msg = new AMQPMessage($data, array('delivery_mode'=>2)#定义分配模式 ); $channel->basic_publish($msg,'','task_queue'); echo " [x] Sent ", $data, "\n";
<?php //增加分配 include_once __DIR__."/vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(" ",array_splice($argv,1)); if(empty($data)) $data = 'hello world'; $msg = new AMQPMessage($data, array('delivery_mode'=>2)#make message persistent ); $channel->basic_publish($msg, 'logs'); echo " [x] Sent ", $data, "\n";
log_client.php
<?php //分配接收 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] ', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
4.Routing
<?php //根据severity不同发布 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); $severity = $argv[1]; if(empty($severity)) $severity = "info"; $data = implode(' ', array_slice($argv, 2)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'direct_logs', $severity); echo " [x] Sent ",$severity,':',$data," \n"; $channel->close(); $connection->close(); ?>
log_direct_client.php
<?php //根据severity不同接收 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $severities = array_slice($argv, 1); if(empty($severities )) { file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n"); exit(1); } foreach($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
相关推荐
zhuxue 2020-10-14
shenzhenzsw 2020-10-09
shyoldboy 2020-09-27
leihui00 2020-09-16
lishijian 2020-08-17
程序员伊成 2020-08-06
ljcsdn 2020-07-27
waitzkj 2020-07-25
powrexly 2020-07-20
liym 2020-07-20
zhoucheng0 2020-07-19
shenzhenzsw 2020-07-18
woaishanguosha 2020-07-18
waitzkj 2020-07-18
zhoucheng0 2020-07-08
json0000 2020-07-04
NVEFLY 2020-07-04
OnMyHeart 2020-07-04