(转)消息处理利器 ActiveMQ 的介绍 & Stomp 协议的使用 PHP demo
随 着互联网企业业务量的不断扩大,企业信息网络系统的愈加复杂,性能问题也就越来越凸显出来,串行的业务处理方式显然已经成为主要的瓶颈,我们需要更多 异 步的并行处理来提高企业信息系统的业务处理能力,因此独立的消息处理系统也就应运而生,ActiveMQ 就是诸多开源消息系统的佼佼者。对于我们 的技术选型来说,稳定和适应性是最重要的考虑因素,因此由 Apache 组织背景而且支持发布/订阅(Pub/Sub)模式以及异步 Stomp 协 议(Streaming Text Orientated Messaging Protocol)和 REST 方式的 ActiveMQ 就成为 了首选的消息处理器,经测试在异步模式下,整个系统的信息处理吞吐量还是很理想的(一台普通服务器能达到每秒收发 4000 个消息,每个消 息 4K 字节这样的速度)。
下面我们来看看 ActiveMQ 的基本配置与使用,由于我们公司大部分的内部服务都是使用脚本语言写 的,比如 PHP/Python 等,所以我们使用 Stomp 协议来处理;另外,为了尽可能提高消息“生产者”的效率,我们采用了异 步 nio 模式;还有关于并发处理以及负载均衡方面还有很多需要注意的配置和选项,我们后面会另找时间补充。
1、下载 ActiveMQ(最新版本 5.4.1)
2、解压安装到 /usr/local/activemq 目录下
3、安装初始化配置文件 ./bin/activemq setup /etc/default/activemq (保存至默认位置,以后可微调启动参数)
4、编辑 ./conf/activemq.xml 加入如下段(authentication / stomp+nio):
...
<plugins>
<!-- Add By James ; Configure authentication; Username, passwords and groups -->
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="${activemq.password}" groups="users,admins"/>
<authenticationUser username="user" password="${guest.password}" groups="users"/>
<authenticationUser username="guest" password="${guest.password}" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
...
<!-- Add by James for support stomp protocol -->
<transportConnector name="stomp+nio" uri="stomp+nio://0.0.0.0:61613?transport.closeAsync=false"/>
...
5、使用 Stomp 库对 ActiveMQ 进行收发消息测试(这里不提供 Java 的例子,因为 Google 上已经有太多了,我们以 PHP 作为脚本语言的程序范例)。
a> 以下是 Stomp 协议的简单命令:
* SEND
* SUBSCRIBE
* UNSUBSCRIBE
* BEGIN
* COMMIT
* ABORT
* ACK
* DISCONNECT
b> PHP 实例(transaction / persistent / ack)
关于 Stomp 的 PHP 库可以到 Google Code 上下载,使用以下测试用例的时候可以配合 http://activemq-hostname:8161/admin/ 查看消息的数量和状态。
c> amq_sent.php(生产者示例)
...
// include a library
require_once("Stomp.php");
// make a connection
$con = new Stomp("tcp://192.168.1.11:61613");
// connect
$con->connect("guest", "password");
// begin transaction
$con->begin("tx1");
try {
// build a message by map
require_once("Stomp/Message/Map.php");
$msgBody = array("city"=>"Belgrade", "name"=>"Dejan");
$msgHeader["persistent"] = "true"; // VERY IMPORTANT : Because By default, Stomp produced messages are set to non-persistent.
$msgHeader["transformation"] = "jms-map-json";
$mapMessage = new StompMessageMap($msgBody, $msgHeader);
// send the message to the queue
$con->send("/queue/test", $mapMessage, array("transaction" => "tx1"));
// test rollback logic
// throw new Exception("Sending failed ...");
// commit sending message
$con->commit("tx1");
// print message
echo "Sent message : ";
print_r($msgBody);
} catch (Exception $e) {
// rollback sending
$con->abort("tx1");
// print message
echo $e->getMessage();
}
// disconnect
$con->disconnect();
...
d> amq_recv.php(消费者示例)
...
// include a library
require_once("Stomp.php");
// make a connection
$con = new Stomp("tcp://192.168.1.11:61613");
// connect with authentication
$con->connect("guest", "password");
// set read timeout
$con->setReadTimeout(1);
// subscribe to the queue
$con->subscribe("/queue/test", array("transformation" => "jms-map-json"));
// receive a message from the queue
$msg = $con->readFrame();
// do what you want with the message
if ( $msg != null) {
echo "Received message : ";
print_r($msg);
// mark the message as received in the queue
$con->ack($msg);
} else {
echo "Failed to receive a message/n";
}
// disconnect
$con->disconnect();
...
6、 目前 ActiveMQ 最新版的消息数据是保存在一个快速的文本数据库 kahadb 里面的,使用虽然速度很快但是一旦出错恢复起来总是有很多的问 题。所以我建议大家还是把主流数据库的持久化选项打开,虽然慢一点但是数据至少恢复起来要简 单不少,我们使用 Mysql 来保存持久化的消息数据,数 据库是 activemq,配置在 ./conf/activemq.xml 中,如下:
...
<persistenceFactory>
<journalPersistenceAdapterFactory dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceFactory>
...
<!-- MySql DataSource Sample Setup -->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.1.10:11811/activemq?relaxAutoCommit=true"/>
<property name="username" value="admin"/>
<property name="password" value="ihush2010"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
...
总结&注意事项:
1、Stomp 协议构造的消息默认是非持久化的,如果要让消息持久化必须加上 "persistent:true" 的消息头,这个搞了我半天时间~
2、编程时候还是需要注意“消费者”的运行效率,因为大量“慢消费者”会在非持久的 Topics 上出现问题,特别在多消费者的情况下,容易造成 ActiveMQ 的反应变慢~
3、如果不是使用 Stomp 协议和 ActiveMQ 建立通讯的情况下尽量使用长连接,Connection 的 start() 和 stop() 方法代价很高。
4、如果使用集群 master-slave 也是可以的,但是总是不够稳定,建议还是使用单台服务器的模式。