java并发容器
java并发容器(Map、List、BlockingQueue)(2011-03-0817:45:35)转载▼标签:java并发容器maplistblockingqueue杂谈分类:架构与开发
Java库本身就有多种线程安全的容器和同步工具,其中同步容器包括两部分:一个是Vector和Hashtable。另外还有JDK1.2中加入的同步包装类,这些类都是由Collections.synchronizedXXX工厂方法。同步容器都是线程安全的,但是对于复合操作,缺有些缺点:
① 迭代:在查觉到容器在迭代开始以后被修改,会抛出一个未检查异常ConcurrentModificationException,为了避免这个异常,需要在迭代期间,持有一个容器锁。但是锁的缺点也很明显,就是对性能的影响。
② 隐藏迭代器:StringBuilder的toString方法会通过迭代容器中的每个元素,另外容器的hashCode和equals方法也会间接地调用迭代。类似地,contailAll、removeAll、retainAll方法,以及容器作为参数的构造函数,都会对容器进行迭代。
③ 缺少即加入等一些复合操作
publicstaticObjectgetLast(Vectorlist){
intlastIndex=list.size()-1;
returnlist.get(lastIndex);
}
publicstaticvoiddeleteLast(Vectorlist){
intlastIndex=list.size()-1;
list.remove(lastIndex);
}
getLast和deleteLast都是复合操作,由先前对原子性的分析可以判断,这依然存在线程安全问题,有可能会抛出ArrayIndexOutOfBoundsException的异常,错误产生的逻辑如下所示:
解决办法就是通过对这些复合操作加锁
1并发容器类
正是由于同步容器类有以上问题,导致这些类成了鸡肋,于是Java5推出了并发容器类,Map对应的有ConcurrentHashMap,List对应的有CopyOnWriteArrayList。与同步容器类相比,它有以下特性:
1.1ConcurrentHashMap
·更加细化的锁机制。同步容器直接把容器对象做为锁,这样就把所有操作串行化,其实这是没必要的,过于悲观,而并发容器采用更细粒度的锁机制,名叫分离锁,保证一些不会发生并发问题的操作进行并行执行
·附加了一些原子性的复合操作。比如putIfAbsent方法
·迭代器的弱一致性,而非“及时失败”。它在迭代过程中不再抛出Concurrentmodificationexception异常,而是弱一致性。
·在并发高的情况下,有可能size和isEmpty方法不准确,但真正在并发环境下这些方法也没什么作用。
·另外,它还有一些附加的原子操作,缺少即加入、相等便移除、相等便替换。
putIfAbsent(Kkey,Vvalue),缺少即加入(如果该键已经存在,则不加入)
如果指定键已经不再与某个值相关联,则将它与给定值关联。
类似于下面的操作
If(!map.containsKey(key)){
returnmap.put(key,value);
}else{
returnmap.get(key);
}
remove(Objectkey,Objectvalue),相等便移除
只有目前将键的条目映射到给定值时,才移除该键的条目。
类似于下面的:
if(map.containsKey(key)&&map.get(key).equals(value)){
Map.remove();
returntrue;
}else{
returnfalse;
}
replace(Kkey,Vvalue)
replace(Kkey,VoldValue,VnewValue),相等便替换。
只有目前将键的条目映射到某一值时,才替换该键的条目。
上面提到的三个,都是原子的。在一些缓存应用中可以考虑代替HashMap/Hashtable。
1.2CopyOnWriteArrayList和CopyOnWriteArraySet
·CopyOnWriteArrayList采用写入时复制的方式避开并发问题。这其实是通过冗余和不可变性来解决并发问题,在性能上会有比较大的代价,但如果写入的操作远远小于迭代和读操作,那么性能就差别不大了。
应用它们的场合通常在数组相对较小,并且遍历操作的数量大大超过可变操作的数量时,这种场合应用它们非常好。它们所有可变的操作都是先取得后台数组的副本,对副本进行更改,然后替换副本,这样可以保证永远不会抛出ConcurrentModificationException移除。
2队列
Java中的队列接口就是Queue,它有会抛出异常的add、remove方法,在队尾插入元素以及对头移除元素,还有不会抛出异常的,对应的offer、poll方法。
2.1LinkedList
List实现了deque接口以及List接口,可以将它看做是这两种的任何一种。
Queuequeue=newLinkedList();
queue.offer("testone");
queue.offer("testtwo");
queue.offer("testthree");
queue.offer("testfour");
System.out.println(queue.poll());//testone
2.2PriorityQueue
一个基于优先级堆(简单的使用链表的话,可能插入的效率会比较低O(N))的无界优先级队列。优先级队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的Comparator进行排序,具体取决于所使用的构造方法。优先级队列不允许使用null元素。依靠自然顺序的优先级队列还不允许插入不可比较的对象。
queue=newPriorityQueue();
queue.offer("testone");
queue.offer("testtwo");
queue.offer("testthree");
queue.offer("testfour");
System.out.println(queue.poll());//testfour
2.3ConcurrentLinkedQueue
基于链节点的,线程安全的队列。并发访问不需要同步。在队列的尾部添加元素,并在头部删除他们。所以只要不需要知道队列的大小,并发队列就是比较好的选择。
3阻塞队列
3.1生产者和消费者模式
生产者和消费者模式,生产者不需要知道消费者的身份或者数量,甚至根本没有消费者,他们只负责把数据放入队列。类似地,消费者也不需要知道生产者是谁,以及是谁给他们安排的工作。
而Java知道大家清楚这个模式的并发复杂性,于是乎提供了阻塞队列(BlockingQueue)来满足这个模式的需求。阻塞队列说起来很简单,就是当队满的时候写线程会等待,直到队列不满的时候;当队空的时候读线程会等待,直到队不空的时候。实现这种模式的方法很多,其区别也就在于谁的消耗更低和等待的策略更优。以LinkedBlockingQueue的具体实现为例,它的put源码如下:
publicvoidput(Ee)throwsInterruptedException{
if(e==null)thrownewNullPointerException();
intc=-1;
finalReentrantLockputLock=this.putLock;
finalAtomicIntegercount=this.count;
putLock.lockInterruptibly();
try{
try{
while(count.get()==capacity)
notFull.await();
}catch(InterruptedExceptionie){
notFull.signal();
//propagatetoanon-interruptedthread
throwie;
}
insert(e);
c=count.getAndIncrement();
if(c+1<capacity)
notFull.signal();
}finally{
putLock.unlock();
}
if(c==0)
signalNotEmpty();
}
撇开其锁的具体实现,其流程就是我们在操作系统课上学习到的标准生产者模式,看来那些枯燥的理论还是有用武之地的。其中,最核心的还是Java的锁实现,有兴趣的朋友可以再进一步深究一下。
阻塞队列Blockingqueue,提供了可阻塞的put和take方法,他们与可定时的offer和poll方法是等价。Put方法简化了处理,如果是有界队列,那么当队列满的时候,生成者就会阻塞,从而改消费者更多的追赶速度。
3.2ArrayBlockingQueue和LinkedBlockingQueue
FIFO的队列,与LinkedList(由链节点支持,无界)和ArrayList(由数组支持,有界)相似(Linked有更好的插入和移除性能,Array有更好的查找性能,考虑到阻塞队列的特性,移除头部,加入尾部,两个都区别不大),但是却拥有比同步List更好的并发性能。
另外,LinkedList永远不会等待,因为他是无界的。
BlockingQueue<String>queue=newArrayBlockingQueue<String>(5);
Producerp=newProducer(queue);
Consumerc1=newConsumer(queue);
Consumerc2=newConsumer(queue);
newThread(p).start();
newThread(c1).start();
newThread(c2).start();
classProducerimplementsRunnable{
privatefinalBlockingQueuequeue;
Producer(BlockingQueueq){queue=q;}
publicvoidrun(){
try{
for(inti=0;i<100;i++){
queue.put(produce());
}
}catch(InterruptedExceptionex){}
}
Stringproduce(){
Stringtemp=""+(char)('A'+(int)(Math.random()*26));
System.out.println("produce"+temp);
returntemp;
}
}
classConsumerimplementsRunnable{
privatefinalBlockingQueuequeue;
Consumer(BlockingQueueq){queue=q;}
publicvoidrun(){
try{
for(inti=0;i<100;i++){
consume(queue.take());
}
}catch(InterruptedExceptionex){}
}
voidconsume(Objectx){
System.out.println("cousume"+x.toString());
}
}
输出:
produceK
cousumeK
produceV
cousumeV
produceQ
cousumeQ
produceI
produceD
produceI
produceG
produceA
produceE
cousumeD
3.3PriorityBlockingQueue
一个按优先级堆支持的无界优先级队列队列,如果不希望按照FIFO的顺序进行处理,它非常有用。它可以比较元素本身的自然顺序,也可以使用一个Comparator排序。
3.4DelayQueue
一个优先级堆支持的,基于时间的调度队列。加入到队列中的元素必须实现新的Delayed接口(只有一个方法,LonggetDelay(java.util.concurrent.TimeUnitunit)),添加可以理立即返回,但是在延迟时间过去之前,不能从队列中取出元素,如果多个元素的延迟时间已到,那么最早失效链接/失效时间最长的元素将第一个取出。
staticclassNanoDelayimplementsDelayed{
longtigger;
NanoDelay(longi){
tigger=System.nanoTime()+i;
}
publicbooleanequals(Objectother){
return((NanoDelay)other).tigger==tigger;
}
publiclonggetDelay(TimeUnitunit){
longn=tigger-System.nanoTime();
returnunit.convert(n,TimeUnit.NANOSECONDS);
}
publiclonggetTriggerTime(){
returntigger;
}
publicintcompareTo(Delayedo){
longi=tigger;
longj=((NanoDelay)o).tigger;
if(i<j){
return-1;
}
if(i>j)
return1;
return0;
}
}
publicstaticvoidmain(String[]args)throwsInterruptedException{
Randomrandom=newRandom();
DelayQueue<NanoDelay>queue=newDelayQueue<NanoDelay>();
for(inti=0;i<5;i++){
queue.add(newNanoDelay(random.nextInt(1000)));
}
longlast=0;
for(inti=0;i<5;i++){
NanoDelaydelay=(NanoDelay)(queue.take());
longtt=delay.getTriggerTime();
System.out.println("Triggertime:"+tt);
if(i!=0){
System.out.println("Data:"+(tt-last));
}
last=tt;
}
}
3.5SynchronousQueue
不是一个真正的队列,因为它不会为队列元素维护任何存储空间,不过它维护一个排队的线程清单,这些线程等待把元素加入(enqueue)队列或者移出(dequeue)队列。也就是说,它非常直接的移交工作,减少了生产者和消费者之间移动数据的延迟时间,另外,也可以更快的知道反馈信息,当移交被接受时,它就知道消费者已经得到了任务。
因为SynChronousQueue没有存储的能力,所以除非另一个线程已经做好准备,否则put和take会一直阻止。它只有在消费者比较充足的时候比较合适。
4双端队列(Deque)
JAVA6中新增了两个容器Deque和BlockingDeque,他们分别扩展了Queue和BlockingQueue。Deque它是一个双端队列,允许高效的在头和尾分别进行插入和删除,它的实现分别是ArrayDeque和LinkedBlockingQueue。
双端队列使得他们能够工作在一种称为“窃取工作”的模式上面。
5最佳实践
1..同步的(synchronized)+HashMap,如果不存在,则计算,然后加入,该方法需要同步。
HashMapcache=newHashMap();
publicsynchronizedVcompute(Aarg){
Vresult=cace.get(arg);
Ii(result==null){
result=c.compute(arg);
Cache.put(result);
}
Returnresult;
}
2.用ConcurrentHashMap代替HashMap+同步.,这样的在get和set的时候也基本能保证原子性。但是会带来重复计算的问题.
Map<A,V>cache=newConcurrentHashMap<A,V>();
publicVcompute(Aarg){
Vresult=cace.get(arg);
Ii(result==null){
result=c.compute(arg);
Cache.put(result);
}
Returnresult;
}
3.采用FutureTask代替直接存储值,这样可以在一开始创建的时候就将Task加入
Map<A,FutureTask<V>>cache=newConcurrentHashMap<A,FutureTask<V>>();
publicVcompute(Aarg){
FutureTask<T>f=cace.get(arg);
//检查再运行的缺陷
Ii(f==null){
Callable<V>evel=newCallable(){
PublicVcall()throws..{
returnc.compute(arg);
}
};
FutureTask<T>ft=newFutureTask<T>(evel);
f=ft;
cache.put(arg,ft;
ft.run();
}
Try{
//阻塞,直到完成
returnf.get();
}cach(){
}
}
4.上面还有检查再运行的缺陷,在高并发的情况下啊,双方都没发现FutureTask,并且都放入Map(后一个被前一个替代),都开始了计算。
这里的解决方案在于,当他们都要放入Map的时候,如果可以有原子方法,那么已经有了以后,后一个FutureTask就加入,并且启动。
publicVcompute(Aarg){
FutureTask<T>f=cace.get(arg);
//检查再运行的缺陷
Ii(f==null){
Callable<V>evel=newCallable(){
PublicVcall()throws..{
returnc.compute(arg);
}
};
FutureTask<T>ft=newFutureTask<T>(evel);
f=cache.putIfAbsent(args,ft);//如果已经存在,返回存在的值,否则返回null
if(f==null){f=ft;ft.run();}//以前不存在,说明应该开始这个计算
else{ft=null;}//取消该计算
}
Try{
//阻塞,直到完成
returnf.get();
}cach(){
}
}
5.上面的程序上来看已经完美了,不过可能带来缓存污染的可能性。如果一个计算被取消或者失败,那么这个键以后的值永远都是失败了;一种解决方案是,发现取消或者失败的task,就移除它,如果有Exception,也移除。
6.另外,如果考虑缓存过期的问题,可以为每个结果关联一个过去时间,并周期性的扫描,清除过期的缓存。(过期时间可以用Delayed接口实现,参考DelayQueue,给他一个大于当前时间XXX的时间,,并且不断减去当前时间,直到返回负数,说明延迟时间已到了。)
Java集合容器总结。
按数据结构主要有以下几类:
1,内置容器:数组
2,list容器:Vetor,Stack,ArrayList,LinkedList,
CopyOnWriteArrayList(1.5),AttributeList(1.5),RoleList(1.5),RoleUnresolvedList(1.5),
ConcurrentLinkedQueue(1.5),ArrayBlockingQueue(1.5),LinkedBlockingQueue(1.5),
PriorityQueue(1.5),PriorityBlockingQueue(1.5),SynchronousQueue(1.5)
3,set容器:HashSet(1.2),LinkedHashSet(1.4),TreeSet(1.2),
CopyOnWriteArraySet(1.5),EnumSet(1.5),JobStateReasons。
4,map容器:Hashtable,HashMap(1.2),TreeMap(1.2),LinkedHashMap(1.4),WeakHashMap(1.2),
IdentityHashMap(1.4),ConcurrentMap(1.5),concurrentHashMap(1.5)。
Set接口继承Collection,但不允许重复,使用自己内部的一个排列机制。
List接口继承Collection,允许重复,以元素安插的次序来放置元素,不会重新排列。
Map接口是一组成对的键-值对象,即所持有的是key-valuepairs。Map中不能有重复的key。拥有自己的内部排列机制。
按新旧主要有以下几类:
Java1.2前的容器:Vector,Stack,Hashtable。
Java1.2的容器:HashSet,TreeSet,HashMap,TreeMap,WeakHashMap
Java1.4的容器:LinkedHashSet,LinkedHashMap,IdentityHashMap,ConcurrentMap,concurrentHashMap
java1.5新增:CopyOnWriteArrayList,AttributeList,RoleList,RoleUnresolvedList,
ConcurrentLinkedQueue,ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue
ArrayBlockingQueue,CopyOnWriteArraySet,EnumSet,
未知:JobStateReasons
按线程安全主要有以下几类:
线程安全
一,使用锁:
完全不支持并发:
list容器:Vetor,Stack,CopyOnWriteArrayList,ArrayBlockingQueue,
LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue
set容器:CopyOnWriteArraySet
map容器:Hashtable
部分支持并发:
list容器:无
set容器:无
map容器:concurrentHashMap
使用非阻塞算法:
list容器:ConcurrentLinkedQueue
set容器:无
map容器:无
二,非线程安全:
list容器:ArrayList,LinkedList,AttributeList,RoleList,RoleUnresolvedList,PriorityQueue
set容器:HashSet,TreeSet,LinkedHashSet,EnumSet
map容器:HashMap,TreeMap,LinkedHashMap,WeakHashMap,IdentityHashMap,EnumMap
按遍历安全主要有以下几类:
一,遍历安全:
可并发遍历:
list容器:CopyOnWriteArrayList,ConcurrentLinkedQueue
set容器:CopyOnWriteArraySet,EnumSet,EnumMap
map容器:无
不可并发遍历:
list容器:Vetor,Stack,Hashtable,ArrayBlockingQueue,
LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue
set容器:无
map容器:Hashtable,concurrentHashMap
注意1:concurrentHashMap迭代器它们不会抛出ConcurrentModificationException。不过,迭代器被设计成每次仅由一个线程使用。
二,遍历不安全:
会抛异常ConcurrentModificationException:
list容器:ArrayList,LinkedList,AttributeList,RoleList,RoleUnresolvedList
set容器:HashSet,TreeSet,TreeSet,LinkedHashSet
map容器:HashMap,TreeMap,LinkedHashMap,WeakHashMap,IdentityHashMap
注意1:返回的迭代器是弱一致的:它们不会抛出ConcurrentModificationException,
也不一定显示在迭代进行时发生的任何映射修改的效果的容器有:
EnumSet,EnumMap
按遍历是否有序性分类
存储数据有序:
list容器:ConcurrentLinkedQueue(1.5),ArrayBlockingQueue(1.5),LinkedBlockingQueue(1.5),
SynchronousQueue(1.5)
set容器:TreeSet(1.2).(他们实现了set接口),
CopyOnWriteArraySet(1.5),EnumSet(1.5),JobStateReasons。
map容器:TreeMap(1.2),LinkedHashMap(1.4)。
一定规则下存储数据有序:
list容器:Stack,Vetor,ArrayList,LinkedList,CopyOnWriteArrayList(1.5),AttributeList(1.5),RoleList(1.5),RoleUnresolvedList(1.5)
set容器:无
map容器:无
遍历无序但移除有序:
list容器:PriorityQueue(1.5),PriorityBlockingQueue(1.5)
set容器:无
map容器:无
无论如何都无序:
list容器:无
set容器:HashSet(1.2),LinkedHashSet(1.4)
map容器:Hashtable,HashMap(1.2),WeakHashMap(1.2),IdentityHashMap(1.4),
ConcurrentMap(1.5),concurrentHashMap(1.5)
可以按自然顺序(参见Comparable)或比较器进行排序的有:
list容器:PriorityQueue(1.5),PriorityBlockingQueue
set容器:TreeSet(1.2)
map容器:TreeMap(1.2)
实现了RandomAccess接口的有:
ArrayList,AttributeList,CopyOnWriteArrayList,RoleList,RoleUnresolvedList,Stack,Vector
RandomAccess接口是List实现所使用的标记接口,用来表明其支持快速(通常是固定时间)随机访问。此接口的主要目的是允许一般的算法更改其行为,从而在将其应用到随机或连续访问列表时能提供良好的性能。
在对List特别的遍历算法中,要尽量来判断是属于RandomAccess(如ArrayList)还是SequenceAccess(如LinkedList),
因为适合RandomAccessList的遍历算法,用在SequenceAccessList上就差别很大,
即对于实现了RandomAccess接口的类实例而言,此循环
for(inti=0,i<list.size();i++)
list.get(i);
的运行速度要快于以下循环:
for(Iteratori=list.iterator();i.hasNext();)
i.next();