rabbitMQ使用


    官方文档地址:http://www.rabbitmq.com/tutorials/tutorial-one-python.html

    安装配置:http://www.ttlsa.com/linux/install-rabbitmq-on-linux/

    1.hello world  The simplest thing that doessomething

       send.php

<?php
//简单的发布消息
include_once __DIR__."/vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//定义队列
$channel->queue_declare('hello', false, false, false, false);
//定义普通消息
$msg = new AMQPMessage('Hello World!');
//发布消息
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();

    client.php

    

<?php
//简单的接收队列消息
include_once __DIR__."/vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//定义消息
$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

//输入消息方法
$callback = function($msg)
{
  echo " [x] Received ".$msg->body."\n";
};

//队列消耗
$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks))
{
   $channel->wait();
}

$channel->close();
$connection->close();

   2.Work queues

    new_task.php

   

<?php
//队列分配发布
include_once __DIR__."/vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();


$channel->queue_declare('task_queue', false, true, false, false);


$data = implode(" ",array_splice($argv,1));
if(empty($data)) $data = 'hello world';
$msg = new AMQPMessage($data,
            array('delivery_mode'=>2)#定义分配模式
);

$channel->basic_publish($msg,'','task_queue');

echo " [x] Sent ", $data, "\n";

  woker.php

<?php
//队列分配接收
include_once __DIR__."/vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

//定义
$channel->queue_declare('task_queue', false, true, false, false);


$data = implode(" ",array_splice($argv,1));
if(empty($data)) $data = 'hello world';
$msg = new AMQPMessage($data,
            array('delivery_mode'=>2)#定义分配模式
);

$channel->basic_publish($msg,'','task_queue');

echo " [x] Sent ", $data, "\n";

   3. Publish/Subscribe

  

<?php
//增加分配 
include_once __DIR__."/vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();


$channel->exchange_declare('logs', 'fanout', false, false, false);





$data = implode(" ",array_splice($argv,1));
if(empty($data)) $data = 'hello world';
$msg = new AMQPMessage($data,
            array('delivery_mode'=>2)#make message persistent
);

$channel->basic_publish($msg, 'logs');

echo " [x] Sent ", $data, "\n";

    log_client.php

   

<?php
//分配接收
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

    4.Routing

   

<?php
//根据severity不同发布
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = $argv[1];
if(empty($severity)) $severity = "info";

$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo " [x] Sent ",$severity,':',$data," \n";

$channel->close();
$connection->close();

?>

    log_direct_client.php

    

<?php
//根据severity不同接收
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if(empty($severities )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

相关推荐