Dubbox+Redis3.0+Spring+Hibernate+zookeeper实现消息推送核心搭建

这里在开始搭建环境之前请搭建先熟悉了解Dubbox、Redis、Zookeeper.... 并且安装Redis、Zookeeper

     下面进入主题首先我们需要下载Dubbox相关代码链接地址:https://github.com/dangdangdotcom/dubbox这里使用Eclipse git工具下载源代码 这里我们具体使用的是 dubbo-demo-provider这个项目 就是服务提供者 这个项目是maven项目在构建war包的时候需要依赖 dubbodubbo-demo-api(这个api项目为什么需要依赖我还没找到原因有朋友知道请告知) 好了 我们来编译dubbo-demo-provider这个项目 主要最好把这个项目单独拷出来引入Eclipse工程中 加入我们已经安装maven环境 。首先进入cmd  到项目所在路径下 比如我的:G:\WokerSpace\dubbo-demo-provider   然后 mvn clean compile 然后 mvn clean install  如果项目没有错误会直接打包成war既可以放到tomcat下部署 注意:tomcat环境需要搭建好。当然我们也可以通过Eclipse来编译和安装 。这里Eclipse还提供直接运行maven项目到 tomcat 右击项目 run as   Run on Server或者 Debug  on Server 启动项目这样启动项目好处是可以断点调试。当然我们需要安装Zookeeper 具体安装参照:http://blog.csdn.net/laoge121/article/details/16359637 这篇文章讲述了Zookeeper  dubbox 、spring集成重点看Zookeeper  安装

    集成Spring 、Hibernate

        首先在WEB-INF下面新建 

               appConfig.properties 配置如下 是jdbc和redis相关配置

              ########################\u6570\u636E\u5E93\u8FDE\u63A5\u4FE1\u606F#############

jdbc.username = root

jdbc.password = 123456

jdbc.url = jdbc:mysql://localhost:3306/tcwyonline?useUnicode=true&characterEncoding=UTF-8

jdbc.driver = com.mysql.jdbc.Driver

#\u6700\u5927\u5206\u914D\u7684\u5BF9\u8C61\u6570  

redis.pool.maxActive=1024

#\u6700\u5927\u80FD\u591F\u4FDD\u6301idel\u72B6\u6001\u7684\u5BF9\u8C61\u6570  

redis.pool.maxIdle=200

#\u5F53\u6C60\u5185\u6CA1\u6709\u8FD4\u56DE\u5BF9\u8C61\u65F6\uFF0C\u6700\u5927\u7B49\u5F85\u65F6\u95F4  

redis.pool.maxWait=1000

#\u5F53\u8C03\u7528borrow Object\u65B9\u6CD5\u65F6\uFF0C\u662F\u5426\u8FDB\u884C\u6709\u6548\u6027\u68C0\u67E5  

redis.pool.testOnBorrow=true

  

#IP  

redis.ip=192.168.1.195

#Port  

redis.port=6379

    applicationContext.xml配置如下 相关配置用途已加以说明

    <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"

    xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"

    xmlns:cache="http://www.springframework.org/schema/cache" xmlns:p="http://www.springframework.org/schema/p"

    xsi:schemaLocation="http://www.springframework.org/schema/beans 

       http://www.springframework.org/schema/beans/spring-beans-4.0.xsd

       http://www.springframework.org/schema/aop

       http://www.springframework.org/schema/aop/spring-aop-4.0.xsd

       http://www.springframework.org/schema/context

       http://www.springframework.org/schema/context/spring-context-4.0.xsd

       http://www.springframework.org/schema/tx

       http://www.springframework.org/schema/tx/spring-tx-4.0.xsd

       http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd">

    <!-- 引入properties文件 -->

     <context:property-placeholder location="/WEB-INF/appConfig.properties" /> 

    <!-- <context:property-placeholder location="/WEB-INF/*.properties"/>-->

    

    <!-- 定义数据库连接池数据源bean destroy-method="close"的作用是当数据库连接不使用的时候,就把该连接重新放到数据池中,方便下次使用调用 -->

    <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"

        destroy-method="close">

        <!-- 设置JDBC驱动名称 -->

        <property name="driverClass" value="${jdbc.driver}" />

        <!-- 设置JDBC连接URL -->

        <property name="jdbcUrl" value="${jdbc.url}" />

        <!-- 设置数据库用户名 -->

        <property name="user" value="${jdbc.username}" />

        <!-- 设置数据库密码 -->

        <property name="password" value="${jdbc.password}" />

        <!-- 设置连接池初始值 -->

        <property name="initialPoolSize" value="5" />

    </bean>

    <!-- 配置sessionFactory -->

    <bean id="sessionFactory"

        class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">

        <!-- 数据源 -->

        <property name="dataSource" ref="dataSource" />

        <!-- hibernate的相关属性配置 -->

        <property name="hibernateProperties">

            <value>

                <!-- 设置数据库方言 -->

                hibernate.dialect=org.hibernate.dialect.MySQLDialect

                <!-- 设置自动创建|更新|验证数据库表结构 -->

                hibernate.hbm2ddl.auto=update

                <!-- 是否在控制台显示sql -->

                hibernate.show_sql=true

                <!-- 是否格式化sql,优化显示 -->

                hibernate.format_sql=true

                <!-- 是否开启二级缓存 -->

                hibernate.cache.use_second_level_cache=false

                <!-- 是否开启查询缓存 -->

                hibernate.cache.use_query_cache=false

                <!-- 数据库批量查询最大数 -->

                hibernate.jdbc.fetch_size=50

                <!-- 数据库批量更新、添加、删除操作最大数 -->

                hibernate.jdbc.batch_size=50

                <!-- 是否自动提交事务 -->

                hibernate.connection.autocommit=true

                <!-- 指定hibernate在何时释放JDBC连接 -->

                hibernate.connection.release_mode=auto

                <!-- 创建session方式 hibernate4.x 的方式 -->

                hibernate.current_session_context_class=org.springframework.orm.hibernate4.SpringSessionContext

                <!-- javax.persistence.validation.mode默认情况下是auto的,就是说如果不设置的话它是会自动去你的classpath下面找一个bean-validation**包 

                    所以把它设置为none即可 -->

                javax.persistence.validation.mode=none

            </value>

        </property>

        <!-- 自动扫描实体对象 tdxy.bean的包结构中存放实体类 -->

        <property name="packagesToScan" value="com.tcwy" />

    </bean>

    <!-- 定义事务管理 -->

    <bean id="transactionManager"

        class="org.springframework.orm.hibernate4.HibernateTransactionManager">

        <property name="sessionFactory" ref="sessionFactory" />

    </bean>

    

    <!-- 定义 Autowired  自动注入 bean -->

    <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/> 

    

    <!-- 扫描有注解的文件  base-package 包路径 -->

    <context:component-scan base-package="com.tcwy"/>

    

    <tx:advice id="txAdvice" transaction-manager="transactionManager">

        <tx:attributes>

            <!-- 事务执行方式

                REQUIRED:指定当前方法必需在事务环境中运行,

                如果当前有事务环境就加入当前正在执行的事务环境,

                如果当前没有事务,就新建一个事务。

                这是默认值。 

             -->

            <tx:method name="send*" propagation="REQUIRED" />

            <tx:method name="create*" propagation="REQUIRED" />

            <tx:method name="save*" propagation="REQUIRED" />

            <tx:method name="add*" propagation="REQUIRED" />

            <tx:method name="update*" propagation="REQUIRED" />

            <tx:method name="remove*" propagation="REQUIRED" />

            <tx:method name="del*" propagation="REQUIRED" />

            <tx:method name="import*" propagation="REQUIRED" />

            <!-- 

                指定当前方法以非事务方式执行操作,如果当前存在事务,就把当前事务挂起,等我以非事务的状态运行完,再继续原来的事务。 

                查询定义即可

                read-only="true"  表示只读

             -->

            <tx:method name="*" propagation="NOT_SUPPORTED" read-only="true" />

        </tx:attributes>

    </tx:advice>

    <!-- 定义切面,在 * tdxy.*.service.*ServiceImpl.*(..) 中执行有关的hibernate session的事务操作 -->

    <aop:config>

        <aop:pointcut id="serviceOperation" expression="execution(* com.tcwy.service.*.impl.*.*(..))" />

        <aop:advisor advice-ref="txAdvice" pointcut-ref="serviceOperation" />

    </aop:config>

     <!--启动spring事务注解功能-->  

    <tx:annotation-driven transaction-manager="txAdvice"/> 

    

    

    

    <!-- spring 定时器 -->

    <bean id="messagetask" class="com.tcwy.service.taskservice.impl.TaskServiceImp"> 

   

</bean> 

<!-- 间隔2秒钟执行任务 -->

    <bean id="messagetaskTopSaleJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> 

<property name="targetObject" ref="messagetask"></property> 

<property name="targetMethod" value="work"></property> 

</bean> 

<bean id="messagetaskTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean"> 

<property name="jobDetail"> 

<ref bean="messagetaskTopSaleJobDetail"/> 

</property> 

<property name="cronExpression"> 

<value>*/5 * * * * ?</value>       <!-- 0 */1 * * * ?  1分钟 -->

</property>       

</bean> 

<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> 

 <property name="triggers"> 

<list> 

<ref bean="messagetaskTrigger"/> 

</list>  

 </property> 

</bean>  

<!-- redis -->

<bean

id="jedisPoolConfig"

class="redis.clients.jedis.JedisPoolConfig">

</bean>

<bean

id="jedisConnectionFactory"

class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">

<property

name="hostName"

value="${redis.ip}" />

<property

name="port"

value="${redis.port}" />

<property

name="poolConfig"

ref="jedisPoolConfig" />

</bean>

<!-- <bean class="org.springframework.data.redis.core.RedisTemplate"

p:connection-factory-ref="jedisConnectionFactory" />

-->

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">

        <property name="connectionFactory" ref="jedisConnectionFactory" />

    </bean>

    

     <bean id="stringRedisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">  

        <property name="connectionFactory"   ref="jedisConnectionFactory" />  

    </bean>   

    

<bean id="redisSerializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer"/>

<bean id="hibernateTemplate" class="org.springframework.orm.hibernate4.HibernateTemplate">

   <property name="sessionFactory" ref="sessionFactory"></property>

</bean>

<!-- 线程池 -->

<!-- 配置线程池 -->

<bean id ="taskExecutor"  class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >

<!-- 线程池维护线程的最少数量 -->

<property name ="corePoolSize" value ="5" />

<!-- 线程池维护线程所允许的空闲时间 -->

<property name ="keepAliveSeconds" value ="30000" />

<!-- 线程池维护线程的最大数量 -->

<property name ="maxPoolSize" value ="1000" />

<!-- 线程池所使用的缓冲队列 -->

<property name ="queueCapacity" value ="200" />

</bean>

</beans>

 项目的pom.xml redis相关依赖如下:

   <!-- redis -->

<dependency>

<groupId>org.springframework.data</groupId>

<artifactId>spring-data-redis</artifactId>

<version>1.6.2.RELEASE</version>

</dependency>

<dependency>

<groupId>org.springframework.data</groupId>

<artifactId>spring-data-jpa</artifactId>

<version>1.6.2.RELEASE</version>

</dependency>

<dependency>

<groupId>org.springframework.data</groupId>

<artifactId>spring-data-mongodb</artifactId>

<version>1.8.2.RELEASE</version>

</dependency>

<dependency>

<groupId>org.springframework.data</groupId>

<artifactId>spring-data-hadoop</artifactId>

<version>0.9.0.RELEASE</version>

</dependency>

<dependency>

   <groupId>org.apache.commons</groupId>

   <artifactId>commons-pool2</artifactId>

   <version>2.2</version>

</dependency>

<dependency>

       <groupId>redis.clients</groupId>

       <artifactId>jedis</artifactId>

       <version>2.5.2</version>

       <type>jar</type>

       <scope>compile</scope>

   </dependency>

   下面我们来构建provider相关配置

        在META-INF下面新建 Spring文件夹 下面就放我们的provider配置文件比如我创建两个文件

               dubbo-demo-provider.xml  项目自带

               dubbo-message-provider.xml  消息推送

                消息推送配置如下:

                

                      <?xml version="1.0" encoding="UTF-8"?>

<!--

 - Copyright 1999-2011 Alibaba Group.

 -  

 - Licensed under the Apache License, Version 2.0 (the "License");

 - you may not use this file except in compliance with the License.

 - You may obtain a copy of the License at

 -  

 -      http://www.apache.org/licenses/LICENSE-2.0

 -  

 - Unless required by applicable law or agreed to in writing, software

 - distributed under the License is distributed on an "AS IS" BASIS,

 - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 - See the License for the specific language governing permissions and

 - limitations under the License.

-->

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd

http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

   

<!-- 推送信息 -->

<!-- 声明暴露接口 -->

<dubbo:service interface="com.tcwy.service.message.MessageRestService" ref="messagerestserviceimpl"  protocol="rest"  timeout="2000" connections="100" validation="true"/>

<!-- 提供者实现 -->

<bean id="messagerestserviceimpl" class="com.tcwy.service.message.impl.MessageRestServiceImpl" >

<property name="messageService" ref="messageServiceImpl"></property>

</bean>

<!-- dao实现 -->

<bean id="messageServiceImpl" class="com.tcwy.service.message.impl.MessageServiceImpl"></bean>

<!-- car end -->

    

</beans>

           

     下面就是怎样编写消息提供者服务接口  具体可以参照 http://blog.csdn.net/laoge121/article/details/16359637 这里就不在一一贴出代码

     在我的消息推送系统中我是这样实现大致流程

      项目中定义一个监听器当项目启动的时候就开始执行从redis获取消息然后推送(各种场景消息推送) 这个监听里面使用了spring 线程池  然后通过spring的定时任务 每个5秒钟就从数据库中获取一定量消息放入到redis中 这样就构成了一个完整的生产者和消费者流程。

     Redis实现消息队列和堆载 这里都是基于对象操作

      package com.tcwy.utils;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.stereotype.Component;

//redis操作类

@Component("redisList")  

public class RedisList {

@Autowired  

protected RedisTemplate redisTemplate;  

/** 

     * 压栈 

     *  

     * @param key 

     * @param value 

     * @return 

     */  

    public Long push(Object key, Object value) {  

        return redisTemplate.opsForList().leftPush(key, value);  

    }  

  

    /** 

     * 出栈 

     *  

     * @param key 

     * @return 

     */  

    public Object pop(Object key) {  

        return redisTemplate.opsForList().leftPop(key);  

    }  

  

    /** 

     * 入队 

     *  

     * @param key 

     * @param value 

     * @return 

     */  

    public Long in(Object key, Object value) {  

    return redisTemplate.opsForList().rightPush(key, value);

    }  

  

    /** 

     * 出队 

     *  

     * @param key 

     * @return 

     */  

    public Object out(Object key) {  

        return redisTemplate.opsForList().leftPop(key);  

    }  

  

    /** 

     * 栈/队列长 

     *  

     * @param key 

     * @return 

     */  

    public Long length(Object key) {  

        return redisTemplate.opsForList().size(key);  

    }  

  

    /** 

     * 范围检索 

     *  

     * @param key 

     * @param start 

     * @param end 

     * @return 

     */  

    public List<Object> range(Object key, int start, int end) {  

        return redisTemplate.opsForList().range(key, start, end);  

    }  

  

    /** 

     * 移除 

     *  

     * @param key 

     * @param i 

     * @param value 

     */  

    public void remove(Object key, long i, Object value) {  

    redisTemplate.opsForList().remove(key, i, value);  

    }  

  

    /** 

     * 检索 

     *  

     * @param key 

     * @param index 

     * @return 

     */  

    public Object index(Object key, long index) {  

        return redisTemplate.opsForList().index(key, index);  

    }  

  

    /** 

     * 置值 

     *  

     * @param key 

     * @param index 

     * @param value 

     */  

    public void set(Object key, long index, Object value) {  

    redisTemplate.opsForList().set(key, index, value);  

    }  

  

    /** 

     * 裁剪 

     *  

     * @param key 

     * @param start 

     * @param end 

     */  

    public void trim(Object key, long start, int end) {  

    redisTemplate.opsForList().trim(key, start, end);  

    } 

}

下面是应用层使用 Redis工具类

   package com.tcwy.utils;

import java.util.List;

import javax.transaction.Transactional;

import org.hibernate.Query;

import org.hibernate.Session;

import org.hibernate.Transaction;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import com.tcwy.po.message.Message;

import com.tcwy.service.basedao.BaseDao;

//工具类

@Component("redisUtils")  

public class RedisUtils extends BaseDao{

private static Logger logger = LoggerFactory.getLogger(RedisUtils.class);

@Autowired

protected RedisList redisList;

public RedisList getRedisList() {

return redisList;

}

public void setRedisList(RedisList redisList) {

this.redisList = redisList;

}

//写入 队列

@SuppressWarnings("unchecked")

@Transactional

public void inqueue(String queueName,List<Message> list) throws Exception{

for(Message message:list){

logger.debug("消息入队列:"+message.getTarget());

redisList.in(queueName,message);

//修改消息状态

message.setRedisstatus(1);

}

if(list!=null&&list.size()>0){

//批量更新

String updateSql = "update Message set redisstatus = 1 where id = :id";

Session session=getHibernateTemplate().getSessionFactory().openSession();

Transaction transaction=session.beginTransaction();

Query query=null;

for(int i=0;i<list.size();i++){

query=session.createQuery(updateSql);

query.setParameter("id", list.get(i).getId());

query.executeUpdate();

}

transaction.commit();

}

}

//出列

@SuppressWarnings("unchecked")

public Message outqueue(String queueName)throws Exception{

Message message=(Message)redisList.out(queueName);

if(message!=null)logger.debug("消息出队列:"+message.getTarget());

return message;

}

}

好了差不多了 下面附近我把代码上传 实在不好描述呀!_!  当然在实际应用中 zookeeper 和Redis需要避免单点问题。。。