redis作为消息队列的使用
========================== 2017/3/31更新=======================
redisson实现了分布式和可扩展的java数据结构,支持的数据结构有:List, Set, Map, Queue, SortedSet, ConcureentMap, Lock, AtomicLong, CountDownLatch。并且是线程安全的,底层使用Netty 4实现网络通信。和jedis相比,功能比较简单,不支持排序,事务,管道,分区等redis特性,可以认为是jedis的补充,不能替换jedis。
redission.getQueue支持队列
==============================================================
在redis支持的数据结构中,有一个是集合list. 对List的操作常见的有lpush lrange等。在这种常见的操作时,我们是把集合当做典型意义上的‘集合’来使用的。往往容易被忽视的是List作为“队列”的使用情况。
反编译redis的jar包,会发现:
public String rpop(String key) { checkIsInMulti(); this.client.rpop(key); return this.client.getBulkReply(); } public String rpoplpush(String srckey, String dstkey) { checkIsInMulti(); this.client.rpoplpush(srckey, dstkey); return this.client.getBulkReply(); } public String lpop(String key) { checkIsInMulti(); this.client.lpop(key); return this.client.getBulkReply(); }
pop意为“弹”,是队列里的取出元素。rpop意为"right pop"意思就是从队列的右边取元素,lpop就是"left pop"当然就是从队列的左边取元素了。对应取元素,我们往队列里面Push元素同样有2个方向:
public Long lpush(String key, String[] strings) { checkIsInMulti(); this.client.lpush(key, strings); return this.client.getIntegerReply(); } public Long rpush(String key, String[] strings) { checkIsInMulti(); this.client.rpush(key, strings); return this.client.getIntegerReply(); }
好了,下面正式开始测试下redis中List数据结构作为队列的使用方法:
假设我需要往队列里面Push map,该怎么做呢? push String值的话比价简单,这里不再研究了……
往队列里push map的话,有以下2种方式:
第一种:
往队列里push一个map:
class PushThread implements Runnable { @Override public void run() { Jedis jedis = null; try{ jedis = JedisPoolUtils.getJedis(); for(int i=0; i<10; i++) { Map map = new HashMap(); map.put("test", "test" + i); map.put("ok", "ok" + i); jedis.lpush("test".getBytes(), ObjectUtils.objectToBytes(map));//Object对象转换成byte数组的方法,这里省略 } } catch(Exception e) { e.printStackTrace(); } finally{ JedisPoolUtils.returnRes(jedis); } } }
从队列里取出map:
class PopThread implements Runnable { @Override public void run() { Jedis jedis = null; try{ jedis = JedisPoolUtils.getJedis(); for(int i=0; i<10; i++) { byte[] key = jedis.rpop("test".getBytes()); System.out.println("弹出:" + ObjectUtils.bytesToObject(key));//byte数组转换为Object对象的方法,这里省略 } } catch(Exception e) { e.printStackTrace(); } finally{ JedisPoolUtils.returnRes(jedis); } }
依次启动Push线程和pop线程,可在pop线程端看到打印结果:
弹出:{ok=ok0, test=test0}
弹出:{ok=ok1, test=test1}
弹出:{ok=ok2, test=test2}
弹出:{ok=ok3, test=test3}
弹出:{ok=ok4, test=test4}
弹出:{ok=ok5, test=test5}
弹出:{ok=ok6, test=test6}
弹出:{ok=ok7, test=test7}
弹出:{ok=ok8, test=test8}
弹出:{ok=ok9, test=test9}
第二种方式:
因为第一种方式毕竟需要在byte数组和Object对象之间来回转换,效率上可能存在一定影响。所以我们加以改进。不妨只把map的key放在一个队列中,取队列元素时,只取key,然后根据key取map即可。
push:
class PushThread implements Runnable { @Override public void run() { Jedis jedis = null; try{ jedis = JedisPoolUtils.getJedis(); for(int i=0; i<10; i++) { Map map = new HashMap(); map.put("test", "test" + i); map.put("ok", "ok" + i); String key = "map" + i; jedis.lpush("test", key);//把所有map的key放入一个队列test中 jedis.hmset(key, map); } } catch(Exception e) { e.printStackTrace(); } finally{ JedisPoolUtils.returnRes(jedis); } }
pop元素:
class PopThread implements Runnable { @Override public void run() { Jedis jedis = null; try{ jedis = JedisPoolUtils.getJedis(); for(int i=0; i<10; i++) { String key = jedis.rpop("test"); System.out.println("弹出:" + jedis.hgetAll(key)); } } catch(Exception e) { e.printStackTrace(); } finally{ JedisPoolUtils.returnRes(jedis); } } }
依次启动Push线程和pop线程,可在pop线程端看到打印结果:
弹出:{test=test0, ok=ok0}
弹出:{test=test1, ok=ok1}
弹出:{test=test2, ok=ok2}
弹出:{test=test3, ok=ok3}
弹出:{test=test4, ok=ok4}
弹出:{test=test5, ok=ok5}
弹出:{test=test6, ok=ok6}
弹出:{test=test7, ok=ok7}
弹出:{test=test8, ok=ok8}
弹出:{test=test9, ok=ok9}
两种方式效果一样,只不过第二种效率上更好一些。
注:当你依次从队列里取出元素后,队列在redis中就不存在了,所以当你pop完元素,再尝试pop的话,会报异常:
redis.clients.jedis.exceptions.JedisDataException: value sent to redis cannot be null
同时,redis中这个队列也不复存在了!
利用redis作为队列的性质我们可以用它来达到类似ZMQ的作用。。