Java Concurrent Programming (6)
6.并发集合类
Hashtable是一个易于使用,线程安全的集合类,它的线程安全性是凭借代价换来的--Hashtable中的所有方法都是同步的。HashMap是Hashtable的继承者,通过一个不同步的基类和一个同步的包装器Collections.synchronizedMap解决了线程安全性问题。通过将基本的功能从线程安全性中分离开来,Collections.synchronizedMap允许需要同步的用户可以拥有同步,而不需要同步的用户不需要为同步付出代价。Hashtable和Collections.synchronizedMap通过同步每个方法获得线程安全。这意味着当一个线程执行一个Map方法时,无论其他线程要对Map进行什么样操作,都不能执行,直到第一个线程结束才可以。这样,就会有两个不足之处:首先,可伸缩性差,因为一次只能有一个线程可以访问hash表。其次,许多公用的混合操作仍需要外部同步,比如put-if-absent操作,必须要外部同步来保证数据的争用。java.util.concurrent包添加了多个新的线程安全集合类(ConcurrentHashMap、CopyOnWriteArrayList和CopyOnWriteArraySet)。这些类的目的是提供高性能、高度可伸缩性、线程安全的基本集合类型版本。
6.1ConcurrentHashMap类
ConcurrentHashMap类是对Map的线程安全的实现,比起synchronizedMap来,它提供了好得多的并发性。多个读操作几乎总可以并发地执行,ConcurrentHashMap的读操作并不需要加锁,是因为其HashEntry被定义为final(HashEntry代表每个hash链中的一个节点),而value被修饰为volatile的原因:
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; }
ConcurrentHashMap允许多个修改操作并发的执行,是因为其使用了锁分离技术,也就是说,它用多个锁来控制对hash表不同部分的修改操作。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hashtable,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行,其也被定义为final,保证了数据的不变性。
final Segment[] segments;
每当进行修改和删除的时候,首先判断是否在同一段内,只要不在同一段内,都可以进行并发的操作。
public V put(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key); return segmentFor(hash).put(key, hash, value, false); } public V remove(Object key) { int hash = hash(key); return segmentFor(hash).remove(key, hash, null); }
ConcurrentHashMap返回的迭代器是弱一致的,并且返回的Iterators将每次最多返回一个元素,意味着它们将不抛出ConcurrentModificationException。
final class KeyIterator extends HashIterator implements Iterator<K>, Enumeration<K> final class ValueIterator extends HashIterator implements Iterator<V>, Enumeration<V> final class EntryIterator extends HashIterator implements Map.Entry<K,V>,Iterator<Entry<K,V>>
6.2CopyOnWriteArrayList和CopyOnWriteArraySet
CopyOnWriteArrayList和CopyOnWriteArraySet,适合于读操作通常大大超过写操作的情况。在CopyOnWriteArrayList和CopyOnWriteArraySet类中,所有可变的(mutable)操作都首先取得后台数组的副本,对副本进行更改,然后替换副本。这种做法保证了在遍历自身更改的集合时,永远不会抛出ConcurrentModificationException。遍历集合会用原来的集合完成,而在以后的操作中使用更新后的集合。
6.3Queue与BlockingQueue
JDK5.0还提供了两个新集合接口——Queue和BlockingQueue。Queue继承自Collection,与List类似,但它只允许从后面插入,从前面删除。通过消除List的随机访问要求,可以创建比现有ArrayList和LinkedList实现性能更好的Queue实现。因为List的许多应用程序实际上不需要随机访问,所以Queue通常可以替代List,来获得更好的性能。其定义和方法如下:
public interface Queue<E> extends Collection<E> { boolean offer(E o); E poll(); E remove(); E peek(); E element(); }
一个队列就是一个先进先出FIFO的数据结构。队列有大小的限制,因此如果想在一个满的队列中继续添加元素,队列会拒绝次元素。如果此时调用的是add方法,那么会抛出异常。队里中的offer方法表示,如果向已满队列中继续添加元素,它会返回false。poll和remove方法都是从队列中删除第一个元素(head)。区别在于,如果队列为空,remove会抛出异常,而poll会返回null。peek和element用于在队列的头部查询元素,与remove方法相似,如果队列为空时,element抛出异常,peek返回null。
再复杂一点的是新的java.util.AbstractQueue类。这个类的工作方式类似于java.util.AbstractList和java.util.AbstractSet类。在创建自定义集合时,不用自己实现整个接口,只是继承抽象实现并填入细节。使用AbstractQueue时,必须为方法offer(),poll()和peek()提供实现。像add()和addAll()这样的方法修改为使用offer(),而clear()和remove()使用poll()。最后,element()使用peek()。当然可以在子类中提供这些方法的优化实现,但是不是必须这么做。而且,不必创建自己的子类,可以使用几个内置的实现,其中两个是不阻塞队列:PriorityQueue和ConcurrentLinkedQueue。PriorityQueue和ConcurrentLinkedQueue类在CollectionFramework中加入两个具体集合实现。PriorityQueue类实质上维护了一个有序列表。加入到Queue中的元素根据它们的天然排序(通过其java.util.Comparable实现)或者根据传递给构造函数的java.util.Comparator实现来定位。ConcurrentLinkedQueue是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大小,ConcurrentLinkedQueue对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。
BlockingQueue继承自Queue,添加了两个操作:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。其定义和新添加方法如下:
public interface BlockingQueue<E> extends Queue<E> { E take() throws InterruptedException; void put(E o) throws InterruptedException; int drainTo(Collection<? super E> c); int drainTo(Collection<? super E> c, int maxElements); }
take表示当删除队列中第一个元素时,如果队列为空,会一直等待队列变为非空。put表示向已满队列添加元素,会一直等待队列空间变得可用。drainTo表示将队列中的元素添加到指定集合中,返回添加的个数。利用take和put方法,实现生产者和消费者:
class Producer implements Runnable { private final BlockingQueue queue; public Producer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { queue.put(produce()); } } catch (InterruptedException e) { e.printStackTrace(); } } private Object produce() { return null; } } class Consumer implements Runnable { private final BlockingQueue queue; public Consumer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { consume(queue.take()); } } catch (InterruptedException e) { e.printStackTrace(); } } private void consume(Object x) { // } }
BlockingQueue的5个实现类分别为:
ArrayBlockingQueue: 一个由数组支持的有界队列。 LinkedBlockingQueue: 一个由链接节点支持的可选有界队列。 PriorityBlockingQueue: 一个由优先级堆支持的无界优先级队列。 DelayQueue: 一个由优先级堆支持的、基于时间的调度队列。 SynchronousQueue: 一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。
ArrayBlockingQueue和LinkedBlockingQueue几乎相同,只是在后备存储器方面有所不同,ArrayBlockingQueue为数组存储,LinkedBlockingQueue为链接节点存储,LinkedBlockingQueue中的头元素为在队列中时间最长的元素。LinkedBlockingQueue并不总是有容量界限。无大小界限的LinkedBlockingQueue类在添加元素时永远不会有阻塞队列的等待(至少在其中有Integer.MAX_VALUE元素之前不会)。
PriorityBlockingQueue是具有无界限容量的队列,它利用所包含元素的Comparable排序顺序来以逻辑顺序维护元素。可以将它看作TreeSet的可能替代物。例如,在队列中加入字符串One,Two,Three和Four会导致Four被第一个取出来。对于没有天然顺序的元素,可以为构造函数提供一个Comparator。不过对PriorityBlockingQueue有一个技巧。从iterator()返回的Iterator实例不需要以优先级顺序返回元素。如果必须以优先级顺序遍历所有元素,那么让它们都通过toArray()方法并自己对它们排序,像Arrays.sort(pq.toArray())。
DelayQueue的定义如下,所以像其中添加的元素必须实现Delay接口(只有一个方法longgetDelay(TimeUnitunit)):
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
因为队列的大小没有界限,使得添加可以立即返回,但是在延迟时间过去之前,不能从队列中取出元素。如果多个元素完成了延迟,那么最早失效/失效时间最长的元素将第一个取出。
SynchronousQueue类是最简单的。它没有内部容量。它就像线程之间的手递手机制。在队列中加入一个元素的生产者会等待另一个线程的消费者。当这个消费者出现时,这个元素就直接在消费者和生产者之间传递,永远不会加入到阻塞队列中。