CyclicBarrier源码分析
概述
CyclicBarrier是一个同步辅助类,它允许一组线程相互等待,直到达到某个公共屏障点。并且在释放等待线程之后,CyclicBarrier是可以重复使用的。
简单使用
下面这段代码利用了CyclicBarrier来使得线程创建后相互等待,直到所有的线程都准备好,以此来使多个线程同时执行。
public class CyclicBarrierTest { public static void main(String[] args) { CyclicBarrierTest cyclicBarrierTest=new CyclicBarrierTest(); cyclicBarrierTest.runThread(); } //有10个线程需要相互等待 CyclicBarrier cyclicBarrier=new CyclicBarrier(10); /** * 创建一个线程 * @return */ private Thread createThread(int i){ Thread thread=new Thread(new Runnable() { @Override public void run() { try { //线程在此相互等待,直到所有线程都准备好 cyclicBarrier.await(); System.out.println("thread"+Thread.currentThread().getName()+"准备完毕"+System.currentTimeMillis()); }catch (InterruptedException e){ e.printStackTrace(); }catch (BrokenBarrierException e){ e.printStackTrace(); } } }); thread.setName("thread-"+i); return thread; } public void runThread(){ ExecutorService executorService= Executors.newFixedThreadPool(10); try { for(int i=0;i<10;i++){ Thread.sleep(100); executorService.submit(createThread(i)); } }catch (InterruptedException e){ e.printStackTrace(); } } }
源码分析
核心属性
private static class Generation { //是否被销毁 boolean broken = false;//false代表没被销毁 } /** The lock for guarding barrier entry ,守护入口的锁*/ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped,等待条件 */ private final Condition trip = lock.newCondition(); /** The number of parties,要屏障的线程数 */ private final int parties; /* The command to run when tripped ,当线程都到待barrier,需要运行的内容*/ private final Runnable barrierCommand; /** The current generation ,记录当前barrier状态的对象*/ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. *当前等待barrier到达的线程的数量 */ private int count;
重要方法分析
await方法
int await()
方法的具体实现如下:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
从这个方法可以看出,实际上起作用的就是dowait(false, 0L);
.
那我们来看一下dowait(false, 0L);
的具体实现:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //获取锁,整段代码都使用该锁进行同步 lock.lock(); try { //获取当前的generation final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); //如果线程被中断,就会终止Barrier,唤醒所有的等待线程 throw new InterruptedException(); } //count就是我们实例化CyclicBarrier时传入的值 //此时index代表当前是最后几个等待的线程 int index = --count; if (index == 0) { // tripped //如过当前线程是最后一个等待的线程 //它都已经调用await,说明所有线程都已经到达 //屏障点了,可以唤醒所有线程了 boolean ranAction = false; try { //如果有barrierCommand,就运行它 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //更新Barrier状态,并唤醒所有线程 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out //自旋等待,直到所有线程都到达屏障点 //或者发生中断 //或者generation被销毁 //或者超时 for (;;) { try { if (!timed) trip.await(); //在此利用lock的Condition阻塞,当前线程 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); //超时就销毁当前Barrier throw new TimeoutException(); } } } finally { lock.unlock(); } }
在这个方法中,还有几个比较重要的方法。
用于销毁Barrier的void breakBarrier()
方法
private void breakBarrier() { //将当前gengeration标记为弃用状态 generation.broken = true; count = parties;//将等待barrier的线程数量,恢复到之前的值 trip.signalAll(); //唤醒锁上的Condition上等待的所有线程 }
用于重置CyclicBarrier和唤醒所有等待线程的void nextGeneration()
方法实现如下:
private void nextGeneration() { // signal completion of last generation trip.signalAll(); //唤醒所有的等待线程 // set up next generation count = parties;//将等待线程数复原,以便CyclicBarrier下次重复使用 generation = new Generation(); //复原generation }
整体看下来,这个CyclicBarrier的实现还是比较简单,我们在实例化CyclicBarrier的时候就指定了一个需要相互等待的线程数。每当一个线程调用await方法的时候,都会去判断,自己是不是最后一个线程,如果自己是最后一个线程,那么说明其它线程都在阻塞等待自己,那么就去唤醒所有等待的线程。如果自己不是最后一个线程,那么就需要去等待其它的线程,那么就去自旋,或者阻塞。
在整个源码中比较重要的一点就是CyclicBarrier内部利用了一个ReentrantLock利用它来对代码块加锁,让线程在它的Condition上阻塞。
每个CyclicBarrier内部都维护了一个Generation对象,它主要是记录当前CyclicBarrier的状态,即是否被弃用。因为CyclicBarrier是可以重复使用的,因此在所有线程都到达屏障点的时候,会调用nextGeneration()
来重置整个CyclicBarrier,方便下次使用。
还需要注意的是,CyclicBarrier是会响应中断,一旦发生中断,就会重置CyclicBarrier,并唤醒等待的线程。