Spring-data-redis: 分布式队列

Redis中list数据结构,具有“双端队列”的特性,同时redis具有持久数据的能力,因此redis实现分布式队列是非常安全可靠的。它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。Redis本身的高性能和"便捷的"分布式设计(replicas,sharding),可以为实现"分布式队列"提供了良好的基础.

Redis中的队列阻塞时,整个connection都无法继续进行其他操作,因此在基于连接池设计是需要注意。

我们通过spring-data-redis,来实现“同步队列”,设计风格类似与JMS。不过本实例中,并没有提供关于队列消费之后的消息确认机制,如果你感兴趣可以自己尝试实现它。

1)Redis中的"队列"为双端队列,基于list数据结构实现,并提供了"队列阻塞"功能.

2)如果你期望使用redis做"分布式队列"server,且数据存取较为密集时,务必配置(redis.conf)中关于list数据结构的限制:

Java代码收藏代码

//当list中数据个数达到阀值是,将会被重构为linkedlist

//如果队列的存/取速度较为接近,此值可以稍大

list-max-ziplist-entries5120

list-max-ziplist-value1024

3)Redis已经提供了"队列"的持久化能力,无需额外的技术支持

4)Redis并没有提供JMS语义中"queue"消息的消费确认的功能,即当队列中的消息被redis-client接收之后,并不会执行"确认消息已到达"的操作;如果你的分布式队列,需要严格的消息确认,需要额外的技术支持.

5)Redis并不能像JMS那样提供高度中心化的"队列"服务集群,它更适合"快速/小巧/及时消费"的情景.

6)本例中,对于消息的接收,是在一个后台线程中进行(参见下文RedisQueue),其实我们可以使用线程池的方式来做,以提高性能.不过此方案,需要基于2个前提:

A)如果单个queue中的消息较多,且每条消息的处理时间较长(即消费速度比接收的速度慢)

B)如果此线程池可以被多个queue公用线程资源,如果一个queue就创建一个线程池,实在是有些浪费且存在不安全问题.

C)需要确认,多线程环境中对queue的操作,有可能在客户端层面打乱了队列的顺序,而造成异常.比如线程1从queue中获得data1,线程2从queue中获得data2,有可能因为线程调度的问题,导致data2被优先执行.

一.配置文件:

Java代码收藏代码

<beansxmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd"default-autowire="byName">

<beanid="jedisPoolConfig"class="redis.clients.jedis.JedisPoolConfig">

<propertyname="maxActive"value="32"></property>

<propertyname="maxIdle"value="6"></property>

<propertyname="maxWait"value="15000"></property>

<propertyname="minEvictableIdleTimeMillis"value="300000"></property>

<propertyname="numTestsPerEvictionRun"value="3"></property>

<propertyname="timeBetweenEvictionRunsMillis"value="60000"></property>

<propertyname="whenExhaustedAction"value="1"></property>

</bean>

<beanid="jedisConnectionFactory"class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"destroy-method="destroy">

<propertyname="poolConfig"ref="jedisPoolConfig"></property>

<propertyname="hostName"value="127.0.0.1"></property>

<propertyname="port"value="6379"></property>

<propertyname="password"value="0123456"></property>

<propertyname="timeout"value="15000"></property>

<propertyname="usePool"value="true"></property>

</bean>

<beanid="jedisTemplate"class="org.springframework.data.redis.core.RedisTemplate">

<propertyname="connectionFactory"ref="jedisConnectionFactory"></property>

<propertyname="defaultSerializer">

<beanclass="org.springframework.data.redis.serializer.StringRedisSerializer"/>

</property>

</bean>

<beanid="jedisQueueListener"class="com.sample.redis.sdr.QueueListener"/>

<beanid="jedisQueue"class="com.sample.redis.sdr.RedisQueue"destroy-method="destroy">

<propertyname="redisTemplate"ref="jedisTemplate"></property>

<propertyname="key"value="user:queue"></property>

<propertyname="listener"ref="jedisQueueListener"></property>

</bean>

</beans>

二.程序实例:

1)QueueListener:当队列中有数据时,可以执行类似于JMS的回调操作。

Java代码收藏代码

publicinterfaceRedisQueueListener<T>{

publicvoidonMessage(Tvalue);

}

Java代码收藏代码

publicclassQueueListener<String>implementsRedisQueueListener<String>{

@Override

publicvoidonMessage(Stringvalue){

System.out.println(value);

}

}

2)RedisQueue:队列操作,内部封装redisTemplate实例;如果配置了“listener”,那么queue将采用“消息回调”的方式执行,listenerThread是一个后台线程,用来自动处理“队列信息”。如果不配置“listener”,那么你可以将redisQueue注入到其他springbean中,手动去“take”数据即可。

Java代码收藏代码

publicclassRedisQueue<T>implementsInitializingBean,DisposableBean{

privateRedisTemplateredisTemplate;

privateStringkey;

privateintcap=Short.MAX_VALUE;//最大阻塞的容量,超过容量将会导致清空旧数据

privatebyte[]rawKey;

privateRedisConnectionFactoryfactory;

privateRedisConnectionconnection;//forblocking

privateBoundListOperations<String,T>listOperations;//noblocking

privateLocklock=newReentrantLock();//基于底层IO阻塞考虑

privateRedisQueueListenerlistener;//异步回调

privateThreadlistenerThread;

privatebooleanisClosed;

publicvoidsetRedisTemplate(RedisTemplateredisTemplate){

this.redisTemplate=redisTemplate;

}

publicvoidsetListener(RedisQueueListenerlistener){

this.listener=listener;

}

publicvoidsetKey(Stringkey){

this.key=key;

}

@Override

publicvoidafterPropertiesSet()throwsException{

factory=redisTemplate.getConnectionFactory();

connection=RedisConnectionUtils.getConnection(factory);

rawKey=redisTemplate.getKeySerializer().serialize(key);

listOperations=redisTemplate.boundListOps(key);

if(listener!=null){

listenerThread=newListenerThread();

listenerThread.setDaemon(true);

listenerThread.start();

}

}

/**

*blocking

*removeandgetlastitemfromqueue:BRPOP

*@return

*/

publicTtakeFromTail(inttimeout)throwsInterruptedException{

lock.lockInterruptibly();

try{

List<byte[]>results=connection.bRPop(timeout,rawKey);

if(CollectionUtils.isEmpty(results)){

returnnull;

}

return(T)redisTemplate.getValueSerializer().deserialize(results.get(1));

}finally{

lock.unlock();

}

}

publicTtakeFromTail()throwsInterruptedException{

returntakeFromTail(0);

}

/**

*从队列的头,插入

*/

publicvoidpushFromHead(Tvalue){

listOperations.leftPush(value);

}

publicvoidpushFromTail(Tvalue){

listOperations.rightPush(value);

}

/**

*noblocking

*@returnnullifnoiteminqueue

*/

publicTremoveFromHead(){

returnlistOperations.leftPop();

}

publicTremoveFromTail(){

returnlistOperations.rightPop();

}

/**

*blocking

*removeandgetfirstitemfromqueue:BLPOP

*@return

*/

publicTtakeFromHead(inttimeout)throwsInterruptedException{

lock.lockInterruptibly();

try{

List<byte[]>results=connection.bLPop(timeout,rawKey);

if(CollectionUtils.isEmpty(results)){

returnnull;

}

return(T)redisTemplate.getValueSerializer().deserialize(results.get(1));

}finally{

lock.unlock();

}

}

publicTtakeFromHead()throwsInterruptedException{

returntakeFromHead(0);

}

@Override

publicvoiddestroy()throwsException{

if(isClosed){

return;

}

shutdown();

RedisConnectionUtils.releaseConnection(connection,factory);

}

privatevoidshutdown(){

try{

listenerThread.interrupt();

}catch(Exceptione){

//

}

}

classListenerThreadextendsThread{

@Override

publicvoidrun(){

try{

while(true){

Tvalue=takeFromHead();//castexception?youshouldcheck.

//逐个执行

if(value!=null){

try{

listener.onMessage(value);

}catch(Exceptione){

//

}

}

}

}catch(InterruptedExceptione){

//

}

}

}

}

3)使用与测试:

Java代码收藏代码

publicstaticvoidmain(String[]args)throwsException{

ClassPathXmlApplicationContextcontext=newClassPathXmlApplicationContext("classpath:spring-redis-beans.xml");

RedisQueue<String>redisQueue=(RedisQueue)context.getBean("jedisQueue");

redisQueue.pushFromHead("test:app");

Thread.sleep(15000);

redisQueue.pushFromHead("test:app");

Thread.sleep(15000);

redisQueue.destroy();

}

在程序运行期间,你可以通过redis-cli(客户端窗口)执行“lpush”,你会发现程序的控制台仍然能够正常打印队列信息。

相关推荐