基于RabbitMq系统重构
一、RabbitMQ简介
官网:http://www.rabbitmq.com/
rabbitMQ是AMQP用Erlang实现的MQAMQP主要是由金融领域的软件专家们贡献的创意,而联合了通讯和软件方面的力量,一起打造出来的规范。只要遵循AMQP的协议,任何一种语言都可以开发消息组件乃至中间件本身。我们之前使用的activeMQ是实现了jms接口,只能在java环境使用。
AMQP协议是一种二进制协议,提供客户端应用与消息中间件之间异步、安全、高效地交互。从整体来看,AMQP协议可划分为三层:
消息中间件的主要功能是消息的路由(Routing)和缓存(Buffering)。在AMQP中提供类似功能的两种域模型:Exchange和Messagequeue。
二、基本概念
如上图所示rabbitmq主要包括四部分组成
1,P代表生产者,C代表消费者,Xexchange交换器,Q红色的表示队列
2,P和C都是在客户端,X和Q在服务器端。
3,发送接收的大概流程是,P发送消息至交换器。
4,C先声明一个队列,然后将队列和交换器绑定,接着接受消息。
5,换句话说,P和C互相不知道对方存在。
三、RabbitMQServer安装
1,下载地址:http://www.rabbitmq.com/download.html,支持多种操作系统:
*Windows:Withinstaller(recommended)|Manual
*Linux/Unix:Debian/Ubuntu|Fedora/RHEL|GenericUnix|Solaris
*MacOSX:Standalone|GenericUnix|Macports|Homebrew
*Cloud:EC2
三、RabbitMQ原生代码实例
1,交换器类型:direct,topic,headers和fanout
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout
所有bind到此exchange的queue都可以接收消息
direct
通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic
所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,ExchangeType为topic的时候相当于使用fanout
Headers
类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。
参考博文:http://www.cnblogs.com/haoxinyue/archive/2012/09/28/2707041.html
实例内容:点对点,工作队列,订阅/发布,路由,主题,RPC
官网实例:http://www.rabbitmq.com/getstarted.html
中文翻译:http://adamlu.net/dev/2011/09/rabbitmq-get-started/
四,spring-amqp
官网:http://projects.spring.io/spring-amqp/
SpringAMQP是基于Spring框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的POJO。
maven引用:
<dependencies><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.2.0.RELEASE</version></dependency></dependencies>
实例:http://docs.spring.io/spring-amqp/reference/html/sample-apps.html
五、tbss与spring整合的日志记录模块
1,虚拟机安装了rabbitMQ,ip:192.168.17.130默认端口:5672
2,log项目结构,如下图:
图1:原来的facade接口,对外供webservice的接口;
图2:增加基于rabbitmq队列的日志消费者服务项目
图3:集成单元测试:主要是LogProducter与spring中的log-rabbit.xml配置为模拟日志生产者
3,采用rabbitmq重构的目的
3.1建立星形的中心处理结构,解决原来webservice的网状部署分布;
3.2日志服务利用rabbitmqbuffer定义可进行持久,对于日志记录(消费者)状态不关心,即透明,不会因为日志记录服务器宕机而导致主业务回滚或者日志丢失;
4,源代码分析
4.1log-server
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>log-parent</artifactId> <groupId>com.yolema.log</groupId> <version>1.0.0.20120828</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>log-server</artifactId> ... <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>${spring.amqp.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> ... </project>
LogConsumer.java
package com.yolema.log.server; import com.yolema.log.core.biz.manager.LoginLogManager; import com.yolema.log.core.biz.manager.OperateLogManager; import com.yolema.log.core.model.entity.LoginLog; import com.yolema.log.core.model.entity.OperateLog; import com.yolema.log.core.model.enums.LogResultEnum; import com.yolema.log.core.model.exceptions.LogException; import com.yolema.log.core.model.results.GenericsResult; import com.yolema.log.core.vo.operate.LogQueryVo; import com.yolema.log.ext.facade.order.LogQueryOrder; import com.yolema.log.ext.facade.results.LogExtResult; import com.yolema.log.ext.facade.sdo.LoginLogSdo; import com.yolema.log.ext.facade.sdo.OperateLogSdo; import com.youlema.tools.jee.beanutils.BeanGeneralConvertor; import com.youlema.tools.jee.converter.BeanConverter; import com.youlema.tools.jee.pages.PageList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.beans.factory.annotation.Autowired; /** * 日志记录消费者 * <p/> * User: sys53 * Date: 13-12-4 下午4:03 * version $Id: LogProducter.java, v 0.1 Exp $ */ public class LogConsumer { private final static Logger LOG = LoggerFactory.getLogger(LogConsumer.class); @Autowired private LoginLogManager loginLogManager; @Autowired private OperateLogManager operateLogManager; /** * 记录一条登录日志 * * @return */ public void recordLoginLog(LoginLogSdo loginLogSdo) { try { if (loginLogSdo == null) { LOG.info("记录登录日志参数不能为null"); throw new LogException(LogResultEnum.ARGS_CAN_NOT_BE_NULL); } GenericsResult<LoginLog> result = loginLogManager.insert(BeanConverter.convert( new LoginLog(), loginLogSdo)); if (!result.isSuccess()) { throw new LogException(result.getResultCode(), result.getResultMsg()); } } catch (LogException e) { LOG.error("记录登录日志失败,出现已知异常", e); } catch (Exception e) { LOG.error("记录登录日志失败,出现未知异常", e); } } /** * 记录一条操作日志 */ public void recordOperateLog(OperateLogSdo operateLogSdo) { try { if (operateLogSdo == null) { LOG.info("记录操作日志参数不能为null"); throw new LogException(LogResultEnum.ARGS_CAN_NOT_BE_NULL); } GenericsResult<OperateLog> result = operateLogManager.insert(BeanConverter.convert( new OperateLog(), operateLogSdo)); if (!result.isSuccess()) { throw new LogException(result.getResultCode(), result.getResultMsg()); } } catch (LogException e) { LOG.error("记录操作日志失败,出现已知异常", e); } catch (Exception e) { LOG.error("记录操作日志失败,出现未知异常", e); } } /** * 查询操作日志列表 * @param logQueryOrder * @return */ public LogExtResult queryOperateLog(LogQueryOrder logQueryOrder){ LogExtResult extResult = new LogExtResult(true); try { GenericsResult<PageList<OperateLog>> result = operateLogManager .queryPageList(BeanConverter.convert(new LogQueryVo(), logQueryOrder)); if (!result.isSuccess()) { throw new LogException(result.getResultCode(), result.getResultMsg()); } extResult.setOperateLogList(BeanGeneralConvertor.convertBeanList2BeanList( OperateLogSdo.class, result.getResultData())); } catch (LogException e) { LOG.error("查询操作日志失败,出现已知异常", e); extResult.setSuccess(false); extResult.setResultCode(e.getResultCode()); extResult.setResultMsg(e.getResultMsg()); } catch (Exception e) { LOG.error("查询操作日志失败,出现未知异常", e); extResult.setSuccess(false); extResult.setResultMsg(e.getMessage()); } return extResult; } }
log-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <rabbit:connection-factory id="connectionFactory" host="192.168.17.130" username="guest" password="guest"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="logExchange" routing-key="log.service"/> <rabbit:queue name="log.queue" /> <rabbit:topic-exchange name="logExchange"> <rabbit:bindings> <rabbit:binding queue="log.queue" pattern="log.*" /> </rabbit:bindings> </rabbit:topic-exchange> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="logConsumer" method="recordLoginLog" queue-names="log.queue" /> <rabbit:listener ref="logConsumer" method="recordOperateLog" queue-names="log.queue" /> <rabbit:listener ref="logConsumer" method="queryOperateLog" queue-names="log.queue" /> </rabbit:listener-container> <bean id="logConsumer" class="com.yolema.log.server.LogConsumer" /> </beans>
在spring配置文件中定义由于主题交换类的监听设置
1,rabbit:connection-factory,定义:192.168.17.130的connectionFactory
2,rabbit:template,定义rabbitTemplate,通过connectionFactory,交换机名为:logExchange,routing-key:log.service
3,定义队列名:log.queue
4,rabbit:topic-exchange定义交机器,并rabbit:binding绑定在队列:log.queue,匹配所有routing-key是:log.开头的
5,beanid="logConsumer”定义普通javabean,定义委托监听的执行方法
6,rabbit:listener-container定义在connectionFactory上的监听容器,rabbit:listener定义队列og.queue来的消息采用哪一个委托类方法;
(需要测试的问题,如果同一个队列中,不同的方法,参数是一样的话,可能会同时执行)
LogServer.java
package com.yolema.log.server; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 日志服务 * <p/> * User: sys53 * Date: 13-12-4 下午4:02 * version $Id: LogServer.java, v 0.1 Exp $ */ public class LogServer { public static void main(String[] args){ new ClassPathXmlApplicationContext("classpath:/log-*.xml"); } } 直接调用spring架框就可以正常启动rabbitMQ监听容器 4.2生产者(junit单元测试代码) LogProducter.java package com.yolema.log.test.integration; import com.yolema.log.ext.facade.order.LogQueryOrder; import com.yolema.log.ext.facade.results.LogExtResult; import com.yolema.log.ext.facade.sdo.LoginLogSdo; import com.yolema.log.ext.facade.sdo.OperateLogSdo; import org.junit.Assert; import org.junit.Test; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import java.util.Date; /** * Write class comments here * <p/> * User: sys53 * Date: 13-12-4 下午5:12 * version $Id: com.yolema.log.test.integration.LogProducter.java, v 0.1 Exp $ */ public class LogProducter extends BaseTest{ @Autowired private AmqpTemplate amqpTemplate; @Test public void recordLoginLog(){ LoginLogSdo sdo = new LoginLogSdo(); sdo.setClientArea("杭州"); sdo.setClientBrowser("ie"); sdo.setClientIp("127.0.0.1"); sdo.setClientOs("xp"); sdo.setLogContent("登录成功"); sdo.setLogTarget("2"); sdo.setLoginType("1"); sdo.setLogTime(new Date()); sdo.setLogResult("SUCCESS"); amqpTemplate.convertAndSend(sdo); } @Test public void recordOperateLog(){ OperateLogSdo sdo = new OperateLogSdo(); sdo.setLogTarget("tbss"); sdo.setLogTime(new Date()); sdo.setLogLvl("1"); sdo.setUrl("http://localhost:8080/tbss"); sdo.setLogContent("rabbitmq测试增加用户"); sdo.setSystemCode("tbss-code"); sdo.setSystemName("tbss-all"); sdo.setModelCode("user"); sdo.setModelName("用户"); sdo.setClientIp("192.168.0.252"); sdo.setClientBrowser("IE"); sdo.setClientArea("杭州"); sdo.setClientOs("mac os x"); amqpTemplate.convertAndSend(sdo); } @Test public void queryOperateLog(){ LogQueryOrder logQueryOrder = new LogQueryOrder(); LogExtResult logExtResult =(LogExtResult) amqpTemplate.convertSendAndReceive(logQueryOrder); Assert.assertTrue(logExtResult.isSuccess()); System.out.println(logExtResult.getOperateLogList().size()); } }
1,直接承继BaseTest的集成测试架构(默认定义spring配置)
2,定义三个测试类,发送三个不同参数
2.1recordLoginLog()发送:amqpTemplate.convertAndSend(sdo)参数为LoginLogSdo类
2.2recordOperateLog()发送:amqpTemplate.convertAndSend(sdo),参数为OperateLogSdo类
2.3queryOperateLog(),同步等待队列返回结果amqpTemplate.convertSendAndReceive(logQueryOrder),参数LogQueryOrder类
log-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <rabbit:connection-factory id="connectionFactory" host="192.168.17.130" username="guest" password="guest"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" queue="log.queue" exchange="logExchange" routing-key="log.service"/> </beans>
rabbit:template定义amqpTemplate,在LogProducter中可以自动注入,connectionFactory同消费者端
六、TBSS系统中非核心业务的架构