关于ActiveMQ集群

1前提

1.下载jdk6(update24),解压,安装,下面用$java_dir$表示JDK主目录

2.下载ActiveMQ5.4.2,解压,下面用$activemq_dir$表示activeMQ主目录

3.下载AapcehANT1.8,解压,下面用$ant_dir$表示ANT主目录

4.配置好环境变量

1)Java_home:指向$java_dir$

2)Ant_home:指向$ant_dir$

3)Path:应包含%java_home%/bin

5.将MySQL驱动包放置到$activemq_home$/lib下(本例用到MySQL数据源)

2解决单点故障:PureMasterSlave

2.1概述

1.Master:给broker取个名字,修改其持久化KAHADB文件

2.Slave:给broker取个名字,修改其持久化KAHADB文件,需要配置Master的地址和端口

3.一个Master只能带一个Slave

4.Master工作期间,会将消息状况自动同步到Slave

5.Master一旦崩溃,Slave自动接替其工作,已发送并尚未消费的消息继续有效

6.Slave接手后,必须停止Slave才能重启先前的Master

2.2MQ配置(这里是一台机器配置)

1.Master:首先复制$activemq_dir$/conf/activemq.xml,并改名为:pure_master.xml,修改文件

1)<brokerbrokername="pure_master"…

2)<kahaDBdirectory="${activemq.base}/data/kahadb_pure_master"/>

3)<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616"/>

2.Slave:首先复制$activemq_dir$/conf/activemq.xml,并改名为:pure_slave.xml,修改文件

1)<brokerbrokername="pure_slave"masterConnectorURI="tcp://0.0.0.0:61616"

shutdownOnMasterFailure="false"…

2)<kahaDBdirectory="${activemq.base}/data/kahadb_pure_slave"/>

3)<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617"/>

3.首先启动Master,启动完毕后在另一个Shell启动Slave,Slave启动后,可以看到Master那个Shell中显示已经Attach上了Slave

1)启动Master:$activemq_dir$/bin>activemqxbean:file:../conf/pure_master.xml

2)启动Slave:$activemq_dir$/bin>activemqxbean:file:../conf/pure_slave.xml

2.3JAVA测试:队列

1.生产者

publicstaticvoidmain(String[]args)throwsException{

ConnectionFactorycf=newActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617)");

Connectionconn=cf.createConnection();

conn.start();

Sessionsess=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);

Queueqq=newActiveMQQueue("qq1");

MessageProducerprod=sess.createProducer(qq);

Messagemsg=null;

Scannerscan=newScanner(System.in);

Stringstr=scan.next();

while(true){

msg=sess.createTextMessage(str);

prod.send(msg);

if(str.equals("exit")){

break;

}

str=scan.next();

}

conn.close();

}

2.消费者

publicstaticvoidmain(String[]args)throwsException{

ConnectionFactorycf=newActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617)");

Connectionconn=cf.createConnection();

conn.start();

Sessionsess=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);

Queueqq=newActiveMQQueue("qq1");

MessageConsumercs=sess.createConsumer(qq);

TextMessagemsg=(TextMessage)cs.receive();

Stringstr=msg.getText();

while(true){

System.out.println("receivemsg:/t"+msg.getText());

if(str.equals("exit")){

break;

}

msg=(TextMessage)cs.receive();

str=msg.getText();

}

conn.close();

}

2.4测试步骤

1.启动生产者,发送几条消息

2.启动消费者,可看到接收到的消息

3.关闭消费者

4.生产者继续发送几条消息—消息A

5.停止Master(可看到生产者端显示连接到Slave(tcp://0.0.0.0:61617)了)

6.生产者继续发送几条消息—消息B

7.启动消费者

8.消费者接收了消息A和消息B,可见Slave接替了Master的工作,而且储存了之前生产者经过Master发送的消息

2.5结论

PureMasterSlave模式实现方式简单,可以实现消息的双机热备功能;队列可以实现消息的异步和点对点发送

3解决单点故障:JDBCMasterSlave

3.1概述

1.配置上,不存在Master和Slave,所有Broder的配置基本是一样的

2.多个共享数据源的Broker构成JDBCMasterSlave

3.给每个Broker取个名字

4.首先抢到资源(数据库锁)的Broker成为Masetr

5.其他Broker保持预备状态,定期尝试抢占资源,运行其的Shell中清楚的显示了这一点

6.一旦Master崩溃,其他Broker尝试抢占资源,最终只有一台抢到,它立刻成为Master

7.之前的Master即使重启成功,也只能作为Slave等待

3.2MQ配置(这里是一台机器配置)

1.将$activemq_dir$/conf/activemq.xml复制3份,分别改名为:jdbc_broker01.xml、jdbc_broker02.xml、jdbc_broker03.xml

2.首先修改jdbc_broker01.xml

1)<brokerbrokername="jdbc_broker01"…

2)<!--配置持久适配器-->

<persistenceAdapter>

<jdbcPersistenceAdapterdataDirectory="${activemq.base}/data"

dataSource="#mysql-ds"/>

</persistenceAdapter>

3)<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616"/>

4)<!--配置数据源:注意是在broker标记之外-->

</broker>

<beanid="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource"

destroy-method="close">

<propertyname="driverClassName"value="com.mysql.jdbc.Driver"/>

<propertyname="url"

value="jdbc:mysql://localhost/test?relaxAutoCommit=true"/>

<propertyname="username"value="root"/>

<propertyname="password"value="root"/>

<propertyname="maxActive"value="200"/>

<propertyname="poolPreparedStatements"value="true"/>

</bean>

3.同样修改jdbc_broker02.xml,与jdbc_broker01.xml不同之处

1)<brokerbrokername="jdbc_broker02"…

2)<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617"/>

4.同样修改jdbc_broker03.xml,与jdbc_broker01.xml不同之处

1)<brokerbrokername="jdbc_broker03"…

2)<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618"/>

3.3JAVA测试:队列

1.代码基本和前面一致,只是因为这里有3个broker,所以创建连接工厂的代码稍有差别:

ConnectionFactorycf=newActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617,tcp://0.0.0.0:61618)");

3.4测试步骤

1.先启动生产者,发送几条消息

2.启动消费者,可看到接收到的消息

3.关闭消费者

4.生产者继续发送几条消息-消息A

5.停止broker01(可看到生产者端显示连接到broker02(tcp://0.0.0.0:61617)了,同时运行broker02的Shell也显示其成为了Master)

6.生产者继续发送几条消息-消息B

7.启动消费者

8.消费者接收了消息A和消息B,可见broker02接替了broker01的工作,而且储存了之前生产者经过broker01发送的消息

9.关闭消费者

10.生产者继续发送几条消息-消息C

11.停止broker02(可看到生产者端显示连接到broker03(tcp://0.0.0.0:61618)了,同时运行broker03的Shell也显示其成为了Master)

12.生产者继续发送几条消息-消息D

13.启动消费者

14.消费者接收了消息C和消息D,可见broker03接替了broker02的工作,而且储存了之前生产者经过broker02发送的消息

15.再次启动broker01,生产者或消费者均未显示连接到broker01(tcp://0.0.0.0:61616),表明broker01此时只是个Slave

3.5结论

JDBCMasterSlave模式实现方式稍微复杂一点,可以实现消息的多点热备功能,Master、Slave的交替完全是即时的,自动的,无需重启任何broker;队列可以实现消息的异步和点对点发送

相关推荐