Dubbox+Redis3.0+Spring+Hibernate+zookeeper实现消息推送核心搭建
这里在开始搭建环境之前请搭建先熟悉了解Dubbox、Redis、Zookeeper.... 并且安装Redis、Zookeeper
下面进入主题首先我们需要下载Dubbox相关代码链接地址:https://github.com/dangdangdotcom/dubbox这里使用Eclipse git工具下载源代码 这里我们具体使用的是 dubbo-demo-provider这个项目 就是服务提供者 这个项目是maven项目在构建war包的时候需要依赖 dubbo、dubbo-demo-api(这个api项目为什么需要依赖我还没找到原因有朋友知道请告知) 好了 我们来编译dubbo-demo-provider这个项目 主要最好把这个项目单独拷出来引入Eclipse工程中 加入我们已经安装maven环境 。首先进入cmd 到项目所在路径下 比如我的:G:\WokerSpace\dubbo-demo-provider 然后 mvn clean compile 然后 mvn clean install 如果项目没有错误会直接打包成war既可以放到tomcat下部署 注意:tomcat环境需要搭建好。当然我们也可以通过Eclipse来编译和安装 。这里Eclipse还提供直接运行maven项目到 tomcat 右击项目 run as Run on Server或者 Debug on Server 启动项目这样启动项目好处是可以断点调试。当然我们需要安装Zookeeper 具体安装参照:http://blog.csdn.net/laoge121/article/details/16359637 这篇文章讲述了Zookeeper dubbox 、spring集成重点看Zookeeper 安装
集成Spring 、Hibernate
首先在WEB-INF下面新建
appConfig.properties 配置如下 是jdbc和redis相关配置
########################\u6570\u636E\u5E93\u8FDE\u63A5\u4FE1\u606F#############
jdbc.username = root
jdbc.password = 123456
jdbc.url = jdbc:mysql://localhost:3306/tcwyonline?useUnicode=true&characterEncoding=UTF-8
jdbc.driver = com.mysql.jdbc.Driver
#\u6700\u5927\u5206\u914D\u7684\u5BF9\u8C61\u6570
redis.pool.maxActive=1024
#\u6700\u5927\u80FD\u591F\u4FDD\u6301idel\u72B6\u6001\u7684\u5BF9\u8C61\u6570
redis.pool.maxIdle=200
#\u5F53\u6C60\u5185\u6CA1\u6709\u8FD4\u56DE\u5BF9\u8C61\u65F6\uFF0C\u6700\u5927\u7B49\u5F85\u65F6\u95F4
redis.pool.maxWait=1000
#\u5F53\u8C03\u7528borrow Object\u65B9\u6CD5\u65F6\uFF0C\u662F\u5426\u8FDB\u884C\u6709\u6548\u6027\u68C0\u67E5
redis.pool.testOnBorrow=true
#IP
redis.ip=192.168.1.195
#Port
redis.port=6379
applicationContext.xml配置如下 相关配置用途已加以说明
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:cache="http://www.springframework.org/schema/cache" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd">
<!-- 引入properties文件 -->
<context:property-placeholder location="/WEB-INF/appConfig.properties" />
<!-- <context:property-placeholder location="/WEB-INF/*.properties"/>-->
<!-- 定义数据库连接池数据源bean destroy-method="close"的作用是当数据库连接不使用的时候,就把该连接重新放到数据池中,方便下次使用调用 -->
<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"
destroy-method="close">
<!-- 设置JDBC驱动名称 -->
<property name="driverClass" value="${jdbc.driver}" />
<!-- 设置JDBC连接URL -->
<property name="jdbcUrl" value="${jdbc.url}" />
<!-- 设置数据库用户名 -->
<property name="user" value="${jdbc.username}" />
<!-- 设置数据库密码 -->
<property name="password" value="${jdbc.password}" />
<!-- 设置连接池初始值 -->
<property name="initialPoolSize" value="5" />
</bean>
<!-- 配置sessionFactory -->
<bean id="sessionFactory"
class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
<!-- 数据源 -->
<property name="dataSource" ref="dataSource" />
<!-- hibernate的相关属性配置 -->
<property name="hibernateProperties">
<value>
<!-- 设置数据库方言 -->
hibernate.dialect=org.hibernate.dialect.MySQLDialect
<!-- 设置自动创建|更新|验证数据库表结构 -->
hibernate.hbm2ddl.auto=update
<!-- 是否在控制台显示sql -->
hibernate.show_sql=true
<!-- 是否格式化sql,优化显示 -->
hibernate.format_sql=true
<!-- 是否开启二级缓存 -->
hibernate.cache.use_second_level_cache=false
<!-- 是否开启查询缓存 -->
hibernate.cache.use_query_cache=false
<!-- 数据库批量查询最大数 -->
hibernate.jdbc.fetch_size=50
<!-- 数据库批量更新、添加、删除操作最大数 -->
hibernate.jdbc.batch_size=50
<!-- 是否自动提交事务 -->
hibernate.connection.autocommit=true
<!-- 指定hibernate在何时释放JDBC连接 -->
hibernate.connection.release_mode=auto
<!-- 创建session方式 hibernate4.x 的方式 -->
hibernate.current_session_context_class=org.springframework.orm.hibernate4.SpringSessionContext
<!-- javax.persistence.validation.mode默认情况下是auto的,就是说如果不设置的话它是会自动去你的classpath下面找一个bean-validation**包
所以把它设置为none即可 -->
javax.persistence.validation.mode=none
</value>
</property>
<!-- 自动扫描实体对象 tdxy.bean的包结构中存放实体类 -->
<property name="packagesToScan" value="com.tcwy" />
</bean>
<!-- 定义事务管理 -->
<bean id="transactionManager"
class="org.springframework.orm.hibernate4.HibernateTransactionManager">
<property name="sessionFactory" ref="sessionFactory" />
</bean>
<!-- 定义 Autowired 自动注入 bean -->
<bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
<!-- 扫描有注解的文件 base-package 包路径 -->
<context:component-scan base-package="com.tcwy"/>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<!-- 事务执行方式
REQUIRED:指定当前方法必需在事务环境中运行,
如果当前有事务环境就加入当前正在执行的事务环境,
如果当前没有事务,就新建一个事务。
这是默认值。
-->
<tx:method name="send*" propagation="REQUIRED" />
<tx:method name="create*" propagation="REQUIRED" />
<tx:method name="save*" propagation="REQUIRED" />
<tx:method name="add*" propagation="REQUIRED" />
<tx:method name="update*" propagation="REQUIRED" />
<tx:method name="remove*" propagation="REQUIRED" />
<tx:method name="del*" propagation="REQUIRED" />
<tx:method name="import*" propagation="REQUIRED" />
<!--
指定当前方法以非事务方式执行操作,如果当前存在事务,就把当前事务挂起,等我以非事务的状态运行完,再继续原来的事务。
查询定义即可
read-only="true" 表示只读
-->
<tx:method name="*" propagation="NOT_SUPPORTED" read-only="true" />
</tx:attributes>
</tx:advice>
<!-- 定义切面,在 * tdxy.*.service.*ServiceImpl.*(..) 中执行有关的hibernate session的事务操作 -->
<aop:config>
<aop:pointcut id="serviceOperation" expression="execution(* com.tcwy.service.*.impl.*.*(..))" />
<aop:advisor advice-ref="txAdvice" pointcut-ref="serviceOperation" />
</aop:config>
<!--启动spring事务注解功能-->
<tx:annotation-driven transaction-manager="txAdvice"/>
<!-- spring 定时器 -->
<bean id="messagetask" class="com.tcwy.service.taskservice.impl.TaskServiceImp">
</bean>
<!-- 间隔2秒钟执行任务 -->
<bean id="messagetaskTopSaleJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<property name="targetObject" ref="messagetask"></property>
<property name="targetMethod" value="work"></property>
</bean>
<bean id="messagetaskTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
<property name="jobDetail">
<ref bean="messagetaskTopSaleJobDetail"/>
</property>
<property name="cronExpression">
<value>*/5 * * * * ?</value> <!-- 0 */1 * * * ? 1分钟 -->
</property>
</bean>
<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="messagetaskTrigger"/>
</list>
</property>
</bean>
<!-- redis -->
<bean
id="jedisPoolConfig"
class="redis.clients.jedis.JedisPoolConfig">
</bean>
<bean
id="jedisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property
name="hostName"
value="${redis.ip}" />
<property
name="port"
value="${redis.port}" />
<property
name="poolConfig"
ref="jedisPoolConfig" />
</bean>
<!-- <bean class="org.springframework.data.redis.core.RedisTemplate"
p:connection-factory-ref="jedisConnectionFactory" />
-->
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory" />
</bean>
<bean id="stringRedisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory" />
</bean>
<bean id="redisSerializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer"/>
<bean id="hibernateTemplate" class="org.springframework.orm.hibernate4.HibernateTemplate">
<property name="sessionFactory" ref="sessionFactory"></property>
</bean>
<!-- 线程池 -->
<!-- 配置线程池 -->
<bean id ="taskExecutor" class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
<!-- 线程池维护线程的最少数量 -->
<property name ="corePoolSize" value ="5" />
<!-- 线程池维护线程所允许的空闲时间 -->
<property name ="keepAliveSeconds" value ="30000" />
<!-- 线程池维护线程的最大数量 -->
<property name ="maxPoolSize" value ="1000" />
<!-- 线程池所使用的缓冲队列 -->
<property name ="queueCapacity" value ="200" />
</bean>
</beans>
项目的pom.xml redis相关依赖如下:
<!-- redis -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.6.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-jpa</artifactId>
<version>1.6.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>1.8.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>0.9.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.5.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
下面我们来构建provider相关配置
在META-INF下面新建 Spring文件夹 下面就放我们的provider配置文件比如我创建两个文件
dubbo-demo-provider.xml 项目自带
dubbo-message-provider.xml 消息推送
消息推送配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<!--
- Copyright 1999-2011 Alibaba Group.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 推送信息 -->
<!-- 声明暴露接口 -->
<dubbo:service interface="com.tcwy.service.message.MessageRestService" ref="messagerestserviceimpl" protocol="rest" timeout="2000" connections="100" validation="true"/>
<!-- 提供者实现 -->
<bean id="messagerestserviceimpl" class="com.tcwy.service.message.impl.MessageRestServiceImpl" >
<property name="messageService" ref="messageServiceImpl"></property>
</bean>
<!-- dao实现 -->
<bean id="messageServiceImpl" class="com.tcwy.service.message.impl.MessageServiceImpl"></bean>
<!-- car end -->
</beans>
下面就是怎样编写消息提供者服务接口 具体可以参照 http://blog.csdn.net/laoge121/article/details/16359637 这里就不在一一贴出代码
在我的消息推送系统中我是这样实现大致流程
项目中定义一个监听器当项目启动的时候就开始执行从redis获取消息然后推送(各种场景消息推送) 这个监听里面使用了spring 线程池 然后通过spring的定时任务 每个5秒钟就从数据库中获取一定量消息放入到redis中 这样就构成了一个完整的生产者和消费者流程。
Redis实现消息队列和堆载 这里都是基于对象操作
package com.tcwy.utils;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
//redis操作类
@Component("redisList")
public class RedisList {
@Autowired
protected RedisTemplate redisTemplate;
/**
* 压栈
*
* @param key
* @param value
* @return
*/
public Long push(Object key, Object value) {
return redisTemplate.opsForList().leftPush(key, value);
}
/**
* 出栈
*
* @param key
* @return
*/
public Object pop(Object key) {
return redisTemplate.opsForList().leftPop(key);
}
/**
* 入队
*
* @param key
* @param value
* @return
*/
public Long in(Object key, Object value) {
return redisTemplate.opsForList().rightPush(key, value);
}
/**
* 出队
*
* @param key
* @return
*/
public Object out(Object key) {
return redisTemplate.opsForList().leftPop(key);
}
/**
* 栈/队列长
*
* @param key
* @return
*/
public Long length(Object key) {
return redisTemplate.opsForList().size(key);
}
/**
* 范围检索
*
* @param key
* @param start
* @param end
* @return
*/
public List<Object> range(Object key, int start, int end) {
return redisTemplate.opsForList().range(key, start, end);
}
/**
* 移除
*
* @param key
* @param i
* @param value
*/
public void remove(Object key, long i, Object value) {
redisTemplate.opsForList().remove(key, i, value);
}
/**
* 检索
*
* @param key
* @param index
* @return
*/
public Object index(Object key, long index) {
return redisTemplate.opsForList().index(key, index);
}
/**
* 置值
*
* @param key
* @param index
* @param value
*/
public void set(Object key, long index, Object value) {
redisTemplate.opsForList().set(key, index, value);
}
/**
* 裁剪
*
* @param key
* @param start
* @param end
*/
public void trim(Object key, long start, int end) {
redisTemplate.opsForList().trim(key, start, end);
}
}
下面是应用层使用 Redis工具类
package com.tcwy.utils;
import java.util.List;
import javax.transaction.Transactional;
import org.hibernate.Query;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.tcwy.po.message.Message;
import com.tcwy.service.basedao.BaseDao;
//工具类
@Component("redisUtils")
public class RedisUtils extends BaseDao{
private static Logger logger = LoggerFactory.getLogger(RedisUtils.class);
@Autowired
protected RedisList redisList;
public RedisList getRedisList() {
return redisList;
}
public void setRedisList(RedisList redisList) {
this.redisList = redisList;
}
//写入 队列
@SuppressWarnings("unchecked")
@Transactional
public void inqueue(String queueName,List<Message> list) throws Exception{
for(Message message:list){
logger.debug("消息入队列:"+message.getTarget());
redisList.in(queueName,message);
//修改消息状态
message.setRedisstatus(1);
}
if(list!=null&&list.size()>0){
//批量更新
String updateSql = "update Message set redisstatus = 1 where id = :id";
Session session=getHibernateTemplate().getSessionFactory().openSession();
Transaction transaction=session.beginTransaction();
Query query=null;
for(int i=0;i<list.size();i++){
query=session.createQuery(updateSql);
query.setParameter("id", list.get(i).getId());
query.executeUpdate();
}
transaction.commit();
}
}
//出列
@SuppressWarnings("unchecked")
public Message outqueue(String queueName)throws Exception{
Message message=(Message)redisList.out(queueName);
if(message!=null)logger.debug("消息出队列:"+message.getTarget());
return message;
}
}
好了差不多了 下面附近我把代码上传 实在不好描述呀!_! 当然在实际应用中 zookeeper 和Redis需要避免单点问题。。。