Java 并发编程:同步容器
为了方便编写出线程安全的程序,Java里面提供了一些线程安全类和并发工具,比如:同步容器、并发容器、阻塞队列、Synchronizer(比如CountDownLatch)。今天我们就来讨论下同步容器。
一.为什么会出现同步容器?
在Java的集合容器框架中,主要有四大类别:List、Set、Queue、Map。
List、Set、Queue接口分别继承了Collection接口,Map本身是一个接口。
注意Collection和Map是一个顶层接口,而List、Set、Queue则继承了Collection接口,分别代表数组、集合和队列这三大类容器。
像ArrayList、LinkedList都是实现了List接口,HashSet实现了Set接口,而Deque(双向队列,允许在队首、队尾进行入队和出队操作)继承了Queue接口,PriorityQueue实现了Queue接口。另外LinkedList(实际上是双向链表)实现了了Deque接口。
像ArrayList、LinkedList、HashMap这些容器都是非线程安全的。
如果有多个线程并发地访问这些容器时,就会出现问题。
因此,在编写程序时,必须要求程序员手动地在任何访问到这些容器的地方进行同步处理,这样导致在使用这些容器的时候非常地不方便。
所以,Java提供了同步容器供用户使用。
二.Java中的同步容器类
在Java中,同步容器主要包括2类:
1)Vector、Stack、HashTable
2)Collections类中提供的静态工厂方法创建的类
Vector实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。
Stack也是一个同步容器,它的方法也用synchronized进行了同步,它实际上是继承于Vector类。
HashTable实现了Map接口,它和HashMap很相似,但是HashTable进行了同步处理,而HashMap没有。
Collections类是一个工具提供类,注意,它和Collection不同,Collection是一个顶层的接口。在Collections类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类,如下图所示:
三、同步容器的缺陷
从同步容器的具体实现源码可知,同步容器中的方法采用了synchronized进行了同步,那么很显然,这必然会影响到执行性能,另外,同步容器就一定是真正地完全线程安全吗?不一定,这个在下面会讲到。
我们首先来看一下传统的非同步容器和同步容器的性能差异,我们以ArrayList和Vector为例:
1.性能问题
由于Vector中对每个公有方法都进行了同步,使得每次只有一个线程能够访问容器的状态。这可能会带来性能问题。
因此为了解决同步容器的性能问题,在Java 1.5中提供了并发容器,位于java.util.concurrent目录下,并发容器的相关知识将在下一篇文章中讲述。
2.同步容器的线程安全问题
同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。容器上常见的复合操作包括:迭代(反复访问元素,直到遍历完容器中所有元素)、跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算,例如“若没有则添加”。在同步容器类中,这些复合操作在没有客户端加锁的情况下仍然是线程安全的,但当其他线程并发地修改容器时,它们可能会表现出意料之外的行为。
下面的程序给出了对 Vector 进行操作的两个方法:getLast 和 deleteLast,它们都会执行“先检查再执行”操作。每个方法首先都获得数组的大小,然后通过结果来获取或删除最后一个元素。
public class UnsafeVectorHelpers {
public static Object getLast(Vector<?> list) throws InterruptedException {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static Object deleteLast(Vector<?> list) {
int lastIndex = list.size() - 1;
return list.remove(lastIndex);
}
public static void main(String[] args) throws InterruptedException {
Vector<Object> vector = new Vector<Object>();
for(int i=1; i<=1000; i++) {
vector.add(i);
}
Thread threadA = new Thread(new Runnable() {
@Override
public void run() {
while(!vector.isEmpty() ) {
try {
System.out.println("query: " + getLast(vector));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
while(!vector.isEmpty() ) {
System.out.println("delete: " + deleteLast(vector));
}
}
});
threadA.start();
threadB.start();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
交替执行 getLast 和 deleteLast 可能会出现这样一种情况,getLast 中获取到list.size(),但是还没调用list.get(lastIndex)方法获取元素;而deleteLast 也获取到了这一size,并且执行了 list.remove 方法,这时 list.get 再使用 lastIndex 去查询就会报 java.lang.ArrayIndexOutOfBoundsException 的错。
我们可以通过客户端加锁,来将“先检查再执行”的复合操作编程原子操作。如下:
public static Object getLast(Vector<?> list) throws InterruptedException {
synchronized (list) {
if(!list.isEmpty()) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
return null;
}
}
public static Object deleteLast(Vector<?> list) {
synchronized (list) {
if(!list.isEmpty()) {
int lastIndex = list.size() - 1;
return list.remove(lastIndex);
}
return null;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
同样的,在多线程环境中,在调用 size 和相应的 get 之间,也可能会出现上面类似的 ArrayIndexOutOfBoundsException 错误。
for(int i=vector.size()-1; i>=0; i--) {
System.out.println(vector.get(i));
}
- 1
- 2
- 3
如果这时,其他线程并发修改了 Vector,则可能导致出现问题,例如上面的 deleteLast 类似的删除操作。
同样的,我们可以通过客户端加锁来解决不可靠迭代的问题。
synchronized (vector) {
for(int i=vector.size()-1; i>=0; i--) {
System.out.println(vector.get(i));
}
}
- 1
- 2
- 3
- 4
- 5
2.1 迭代器与 ConcurrentModificationException
同步容器的迭代器中没有考虑到并发修改的问题,表现出的行为是“及时失败”(fail-fast)的。这意味着,当它们发现容器在迭代过程中被修改时,就会抛出一个 ConcurrentModificationException 异常。
下面的程序说明了如何使用 for-each 循环语法对 List 容器进行迭代。
public class UnsafeVectorHelpers3 {
public static Object deleteLast(Vector<?> list) {
int lastIndex = list.size() - 1;
return list.remove(lastIndex);
}
public static void main(String[] args) throws InterruptedException {
Vector<Object> vector = new Vector<Object>();
for(int i=1; i<=1000; i++) {
vector.add(i);
}
Thread threadA = new Thread(new Runnable() {
@Override
public void run() {
while(!vector.isEmpty()) {
System.out.println("delete: " + deleteLast(vector));
}
}
});
threadA.start();
// 可能抛出 ConcurrentModificationException 异常
for (Object obj : vector) {
System.out.println(obj);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
这种“及时失败”的迭代器并不是一种完备的处理机制,而只是“善意地”捕获并发错误,因此只能作为并发问题的预警指示器。将计数器的变化与容器关联起来:如果在迭代器遍历期间计数器被修改,那么 hasNext 或者 next 将抛出 ConcurrentModificationException。
Vector 继承自 AbstractList,Abstract 里有一个protected的属性 modCount。
protected transient int modCount = 0;
- 1
Vector 中添加/删除元素时,都会将该 modCount 的值加1,如 removeElement 方法
public synchronized boolean removeElement(Object obj) {
modCount++;
int i = indexOf(obj);
if (i >= 0) {
removeElementAt(i);
return true;
}
return false;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
而使用 foreach 遍历时,会初始一个 Itr 实例,expectedModCount 成员变量的初始化为当前 Vector 的modCount的值。当进行迭代遍历时,如果其它线程在 Vector 实例上添加或者删除了元素,modCount 就会发生变化,expectedModCount 就会与 modCount 的值不等,当调用 hasNext 方法或者 next 方法时,会调用 checkForComodification 方法,就会检测到 expectedModCount 与 modCount 的值不等,抛出 ConcurrentModificationException 异常。
/**
* Returns an iterator over the elements in this list in proper sequence.
*
* <p>The returned iterator is <a href="#fail-fast"><i>fail-fast</i></a>.
*
* @return an iterator over the elements in this list in proper sequence
*/
public synchronized Iterator<E> iterator() {
return new Itr();
}
/**
* An optimized version of AbstractList.Itr
*/
private class Itr implements Iterator<E> {
int cursor; // index of next element to return
int lastRet = -1; // index of last element returned; -1 if no such
int expectedModCount = modCount;
public boolean hasNext() {
// Racy but within spec, since modifications are checked
// within or after synchronization in next/previous
return cursor != elementCount;
}
public E next() {
synchronized (Vector.this) {
checkForComodification();
int i = cursor;
if (i >= elementCount)
throw new NoSuchElementException();
cursor = i + 1;
return elementData(lastRet = i);
}
}
public void remove() {
if (lastRet == -1)
throw new IllegalStateException();
synchronized (Vector.this) {
checkForComodification();
Vector.this.remove(lastRet);
expectedModCount = modCount;
}
cursor = lastRet;
lastRet = -1;
}
// ....
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
要解决迭代器遍历过程中可能出现的并发修改问题,防止出现 ConcurrentModificationException 异常,有两种办法。
一种是在客户端对并发容器实例进行加锁
synchronized (vector) {
for (Object obj : vector) {
System.out.println(obj);
}
}
- 1
- 2
- 3
- 4
- 5
另外一种是,对并发容器实例进行拷贝,然后在拷贝得到的副本上进行遍历。这种方式的好坏取决于多个因素,包括容器的大小,在每个元素上执行的工作,迭代操作相对于容器其他操作的调用频率,以及在响应时间和吞吐量等方面的需求。
Object[] arr;
// 复制期间需要加锁
synchronized (vector) {
arr = new Object[vector.size()];
vector.toArray(arr);
}
if(null != arr) {
for (int i = 0; i < arr.length; i++) {
System.out.println("query: " + arr[i]);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
2.3 隐藏迭代器
容器的 toString 、hashCode 和 equals 等方法会间接地执行迭代操作。同样,containsAll、removeAll 和 retainAll 等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。所有这些间接的迭代操作都可能抛出 ConcurrentModificationException 异常。如下面的 Vector 的父类 AbstractList 的 equals 方法和 hasCode 方法
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof List))
return false;
ListIterator<E> e1 = listIterator();
ListIterator<?> e2 = ((List<?>) o).listIterator();
while (e1.hasNext() && e2.hasNext()) {
E o1 = e1.next();
Object o2 = e2.next();
if (!(o1==null ? o2==null : o1.equals(o2)))
return false;
}
return !(e1.hasNext() || e2.hasNext());
}
/**
* Returns the hash code value for this list.
*
* <p>This implementation uses exactly the code that is used to define the
* list hash function in the documentation for the {@link List#hashCode}
* method.
*
* @return the hash code value for this list
*/
public int hashCode() {
int hashCode = 1;
for (E e : this)
hashCode = 31*hashCode + (e==null ? 0 : e.hashCode());
return hashCode;
}