Java 并发编程:同步容器

为了方便编写出线程安全的程序,Java里面提供了一些线程安全类和并发工具,比如:同步容器、并发容器、阻塞队列、Synchronizer(比如CountDownLatch)。今天我们就来讨论下同步容器。

一.为什么会出现同步容器?

在Java的集合容器框架中,主要有四大类别:List、Set、Queue、Map。

List、Set、Queue接口分别继承了Collection接口,Map本身是一个接口。

注意Collection和Map是一个顶层接口,而List、Set、Queue则继承了Collection接口,分别代表数组、集合和队列这三大类容器。

像ArrayList、LinkedList都是实现了List接口,HashSet实现了Set接口,而Deque(双向队列,允许在队首、队尾进行入队和出队操作)继承了Queue接口,PriorityQueue实现了Queue接口。另外LinkedList(实际上是双向链表)实现了了Deque接口。

像ArrayList、LinkedList、HashMap这些容器都是非线程安全的。

如果有多个线程并发地访问这些容器时,就会出现问题。

因此,在编写程序时,必须要求程序员手动地在任何访问到这些容器的地方进行同步处理,这样导致在使用这些容器的时候非常地不方便。

所以,Java提供了同步容器供用户使用。

二.Java中的同步容器类

在Java中,同步容器主要包括2类:

1)Vector、Stack、HashTable

2)Collections类中提供的静态工厂方法创建的类

Vector实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。

Stack也是一个同步容器,它的方法也用synchronized进行了同步,它实际上是继承于Vector类。

HashTable实现了Map接口,它和HashMap很相似,但是HashTable进行了同步处理,而HashMap没有。

Collections类是一个工具提供类,注意,它和Collection不同,Collection是一个顶层的接口。在Collections类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类,如下图所示:

Java 并发编程:同步容器

三、同步容器的缺陷

从同步容器的具体实现源码可知,同步容器中的方法采用了synchronized进行了同步,那么很显然,这必然会影响到执行性能,另外,同步容器就一定是真正地完全线程安全吗?不一定,这个在下面会讲到。

我们首先来看一下传统的非同步容器和同步容器的性能差异,我们以ArrayList和Vector为例:

1.性能问题

由于Vector中对每个公有方法都进行了同步,使得每次只有一个线程能够访问容器的状态。这可能会带来性能问题。

因此为了解决同步容器的性能问题,在Java 1.5中提供了并发容器,位于java.util.concurrent目录下,并发容器的相关知识将在下一篇文章中讲述。

2.同步容器的线程安全问题

同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。容器上常见的复合操作包括:迭代(反复访问元素,直到遍历完容器中所有元素)、跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算,例如“若没有则添加”。在同步容器类中,这些复合操作在没有客户端加锁的情况下仍然是线程安全的,但当其他线程并发地修改容器时,它们可能会表现出意料之外的行为。

下面的程序给出了对 Vector 进行操作的两个方法:getLast 和 deleteLast,它们都会执行“先检查再执行”操作。每个方法首先都获得数组的大小,然后通过结果来获取或删除最后一个元素。

public class UnsafeVectorHelpers {

public static Object getLast(Vector<?> list) throws InterruptedException {

int lastIndex = list.size() - 1;

return list.get(lastIndex);

}

public static Object deleteLast(Vector<?> list) {

int lastIndex = list.size() - 1;

return list.remove(lastIndex);

}

public static void main(String[] args) throws InterruptedException {

Vector<Object> vector = new Vector<Object>();

for(int i=1; i<=1000; i++) {

vector.add(i);

}

Thread threadA = new Thread(new Runnable() {

@Override

public void run() {

while(!vector.isEmpty() ) {

try {

System.out.println("query: " + getLast(vector));

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

});

Thread threadB = new Thread(new Runnable() {

@Override

public void run() {

while(!vector.isEmpty() ) {

System.out.println("delete: " + deleteLast(vector));

}

}

});

threadA.start();

threadB.start();

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

交替执行 getLast 和 deleteLast 可能会出现这样一种情况,getLast 中获取到list.size(),但是还没调用list.get(lastIndex)方法获取元素;而deleteLast 也获取到了这一size,并且执行了 list.remove 方法,这时 list.get 再使用 lastIndex 去查询就会报 java.lang.ArrayIndexOutOfBoundsException 的错。

我们可以通过客户端加锁,来将“先检查再执行”的复合操作编程原子操作。如下:

public static Object getLast(Vector<?> list) throws InterruptedException {

synchronized (list) {

if(!list.isEmpty()) {

int lastIndex = list.size() - 1;

return list.get(lastIndex);

}

return null;

}

}

public static Object deleteLast(Vector<?> list) {

synchronized (list) {

if(!list.isEmpty()) {

int lastIndex = list.size() - 1;

return list.remove(lastIndex);

}

return null;

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

同样的,在多线程环境中,在调用 size 和相应的 get 之间,也可能会出现上面类似的 ArrayIndexOutOfBoundsException 错误。

for(int i=vector.size()-1; i>=0; i--) {

System.out.println(vector.get(i));

}

  • 1
  • 2
  • 3

如果这时,其他线程并发修改了 Vector,则可能导致出现问题,例如上面的 deleteLast 类似的删除操作。

同样的,我们可以通过客户端加锁来解决不可靠迭代的问题。

synchronized (vector) {

for(int i=vector.size()-1; i>=0; i--) {

System.out.println(vector.get(i));

}

}

  • 1
  • 2
  • 3
  • 4
  • 5

2.1 迭代器与 ConcurrentModificationException

同步容器的迭代器中没有考虑到并发修改的问题,表现出的行为是“及时失败”(fail-fast)的。这意味着,当它们发现容器在迭代过程中被修改时,就会抛出一个 ConcurrentModificationException 异常。

下面的程序说明了如何使用 for-each 循环语法对 List 容器进行迭代。

public class UnsafeVectorHelpers3 {

public static Object deleteLast(Vector<?> list) {

int lastIndex = list.size() - 1;

return list.remove(lastIndex);

}

public static void main(String[] args) throws InterruptedException {

Vector<Object> vector = new Vector<Object>();

for(int i=1; i<=1000; i++) {

vector.add(i);

}

Thread threadA = new Thread(new Runnable() {

@Override

public void run() {

while(!vector.isEmpty()) {

System.out.println("delete: " + deleteLast(vector));

}

}

});

threadA.start();

// 可能抛出 ConcurrentModificationException 异常

for (Object obj : vector) {

System.out.println(obj);

}

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

这种“及时失败”的迭代器并不是一种完备的处理机制,而只是“善意地”捕获并发错误,因此只能作为并发问题的预警指示器。将计数器的变化与容器关联起来:如果在迭代器遍历期间计数器被修改,那么 hasNext 或者 next 将抛出 ConcurrentModificationException。

Vector 继承自 AbstractList,Abstract 里有一个protected的属性 modCount。

protected transient int modCount = 0;

  • 1

Vector 中添加/删除元素时,都会将该 modCount 的值加1,如 removeElement 方法

public synchronized boolean removeElement(Object obj) {

modCount++;

int i = indexOf(obj);

if (i >= 0) {

removeElementAt(i);

return true;

}

return false;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

而使用 foreach 遍历时,会初始一个 Itr 实例,expectedModCount 成员变量的初始化为当前 Vector 的modCount的值。当进行迭代遍历时,如果其它线程在 Vector 实例上添加或者删除了元素,modCount 就会发生变化,expectedModCount 就会与 modCount 的值不等,当调用 hasNext 方法或者 next 方法时,会调用 checkForComodification 方法,就会检测到 expectedModCount 与 modCount 的值不等,抛出 ConcurrentModificationException 异常。

/**

* Returns an iterator over the elements in this list in proper sequence.

*

* <p>The returned iterator is <a href="#fail-fast"><i>fail-fast</i></a>.

*

* @return an iterator over the elements in this list in proper sequence

*/

public synchronized Iterator<E> iterator() {

return new Itr();

}

/**

* An optimized version of AbstractList.Itr

*/

private class Itr implements Iterator<E> {

int cursor; // index of next element to return

int lastRet = -1; // index of last element returned; -1 if no such

int expectedModCount = modCount;

public boolean hasNext() {

// Racy but within spec, since modifications are checked

// within or after synchronization in next/previous

return cursor != elementCount;

}

public E next() {

synchronized (Vector.this) {

checkForComodification();

int i = cursor;

if (i >= elementCount)

throw new NoSuchElementException();

cursor = i + 1;

return elementData(lastRet = i);

}

}

public void remove() {

if (lastRet == -1)

throw new IllegalStateException();

synchronized (Vector.this) {

checkForComodification();

Vector.this.remove(lastRet);

expectedModCount = modCount;

}

cursor = lastRet;

lastRet = -1;

}

// ....

final void checkForComodification() {

if (modCount != expectedModCount)

throw new ConcurrentModificationException();

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

要解决迭代器遍历过程中可能出现的并发修改问题,防止出现 ConcurrentModificationException 异常,有两种办法。

一种是在客户端对并发容器实例进行加锁

synchronized (vector) {

for (Object obj : vector) {

System.out.println(obj);

}

}

  • 1
  • 2
  • 3
  • 4
  • 5

另外一种是,对并发容器实例进行拷贝,然后在拷贝得到的副本上进行遍历。这种方式的好坏取决于多个因素,包括容器的大小,在每个元素上执行的工作,迭代操作相对于容器其他操作的调用频率,以及在响应时间和吞吐量等方面的需求。

Object[] arr;

// 复制期间需要加锁

synchronized (vector) {

arr = new Object[vector.size()];

vector.toArray(arr);

}

if(null != arr) {

for (int i = 0; i < arr.length; i++) {

System.out.println("query: " + arr[i]);

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2.3 隐藏迭代器

容器的 toString 、hashCode 和 equals 等方法会间接地执行迭代操作。同样,containsAll、removeAll 和 retainAll 等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。所有这些间接的迭代操作都可能抛出 ConcurrentModificationException 异常。如下面的 Vector 的父类 AbstractList 的 equals 方法和 hasCode 方法

public boolean equals(Object o) {

if (o == this)

return true;

if (!(o instanceof List))

return false;

ListIterator<E> e1 = listIterator();

ListIterator<?> e2 = ((List<?>) o).listIterator();

while (e1.hasNext() && e2.hasNext()) {

E o1 = e1.next();

Object o2 = e2.next();

if (!(o1==null ? o2==null : o1.equals(o2)))

return false;

}

return !(e1.hasNext() || e2.hasNext());

}

/**

* Returns the hash code value for this list.

*

* <p>This implementation uses exactly the code that is used to define the

* list hash function in the documentation for the {@link List#hashCode}

* method.

*

* @return the hash code value for this list

*/

public int hashCode() {

int hashCode = 1;

for (E e : this)

hashCode = 31*hashCode + (e==null ? 0 : e.hashCode());

return hashCode;

}

相关推荐