读CyclicBarrier源码
//一个循环的屏障。所有的线程在屏障处等待其他线程执行完毕。然后再各自执行。 //先看构造函数 public CyclicBarrier(int parties) { this(parties, null); } //barrierAction代表在屏障上等待的最后一个线程已经执行完后,执行的runnable public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //等待其他线程执行到这 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } //如果count-1不为0则线程在trip条件上等待。否则唤醒在trip条件上等待的线程。 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); //如果线程已经被中断了 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //如果已经全部执行完了 if (index == 0) { // tripped boolean ranAction = false; try { //执行runnable final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //在trip条件上等待 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } //中断此barrier将所有在trip条件上等待的线程加入唤醒的队列 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } //重新生成下一代CyclicBarrier private void nextGeneration() { //唤醒在trip上等待的所有线程 trip.signalAll(); // set up next generation //设置下一代操作(重用CyclicBarrier) count = parties; generation = new Generation(); } //在此时间段等待 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } //是否已经被损坏 public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } //返回要求启动此CyclicBarrier的参与者数量。 public int getParties() { return parties; } //还有几个参与者在等待 public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } //损坏旧屏障。创建新一代 public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
相关推荐
瓜牛呱呱 2020-11-12
柳木木的IT 2020-11-04
yifouhu 2020-11-02
lei0 2020-11-02
源码zanqunet 2020-10-28
源码zanqunet 2020-10-26
一叶梧桐 2020-10-14
码代码的陈同学 2020-10-14
lukezhong 2020-10-14
lzzyok 2020-10-10
anchongnanzi 2020-09-21
clh0 2020-09-18
changcongying 2020-09-17
星辰大海的路上 2020-09-13
abfdada 2020-08-26
mzy000 2020-08-24
shenlanse 2020-08-18
zhujiangtaotaise 2020-08-18
xiemanR 2020-08-17